Pārlūkot izejas kodu

HDFS-13616. Batch listing of multiple directories (#1725)

Chao Sun 5 gadi atpakaļ
vecāks
revīzija
d7c4f8ab21
25 mainītis faili ar 1249 papildinājumiem un 5 dzēšanām
  1. 27 0
      hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FileSystem.java
  2. 91 0
      hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/PartialListing.java
  3. 5 0
      hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/TestFilterFileSystem.java
  4. 4 0
      hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/TestHarFileSystem.java
  5. 1 0
      hadoop-hdfs-project/hadoop-hdfs-client/dev-support/findbugsExcludeFile.xml
  6. 20 1
      hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSClient.java
  7. 111 0
      hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DistributedFileSystem.java
  8. 62 0
      hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/BatchedDirectoryListing.java
  9. 18 0
      hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/ClientProtocol.java
  10. 82 0
      hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/HdfsPartialListing.java
  11. 50 0
      hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolTranslatorPB.java
  12. 12 0
      hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelperClient.java
  13. 13 0
      hadoop-hdfs-project/hadoop-hdfs-client/src/main/proto/ClientNamenodeProtocol.proto
  14. 18 0
      hadoop-hdfs-project/hadoop-hdfs-client/src/main/proto/hdfs.proto
  15. 1 0
      hadoop-hdfs-project/hadoop-hdfs-client/src/test/java/org/apache/hadoop/hdfs/protocol/TestReadOnly.java
  16. 7 0
      hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterClientProtocol.java
  17. 8 0
      hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcServer.java
  18. 2 0
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
  19. 60 2
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolServerSideTranslatorPB.java
  20. 4 0
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java
  21. 175 2
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
  22. 23 0
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java
  23. 10 0
      hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml
  24. 35 0
      hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/ListingBenchmark.java
  25. 410 0
      hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestBatchedListDirectories.java

+ 27 - 0
hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FileSystem.java

@@ -2207,6 +2207,33 @@ public abstract class FileSystem extends Configured
     return new DirListingIterator<>(p);
   }
 
+  /**
+   * Batched listing API that returns {@link PartialListing}s for the
+   * passed Paths.
+   *
+   * @param paths List of paths to list.
+   * @return RemoteIterator that returns corresponding PartialListings.
+   * @throws IOException
+   */
+  public RemoteIterator<PartialListing<FileStatus>> batchedListStatusIterator(
+      final List<Path> paths) throws IOException {
+    throw new UnsupportedOperationException("Not implemented");
+  }
+
+  /**
+   * Batched listing API that returns {@link PartialListing}s for the passed
+   * Paths. The PartialListing will contain {@link LocatedFileStatus} entries
+   * with locations.
+   *
+   * @param paths List of paths to list.
+   * @return RemoteIterator that returns corresponding PartialListings.
+   * @throws IOException
+   */
+  public RemoteIterator<PartialListing<LocatedFileStatus>> batchedListLocatedStatusIterator(
+      final List<Path> paths) throws IOException {
+    throw new UnsupportedOperationException("Not implemented");
+  }
+
   /**
    * List the statuses and block locations of the files in the given path.
    * Does not guarantee to return the iterator that traverses statuses

+ 91 - 0
hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/PartialListing.java

@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.fs;
+
+import com.google.common.base.Preconditions;
+import org.apache.commons.lang3.builder.ToStringBuilder;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.ipc.RemoteException;
+
+import java.io.IOException;
+import java.util.List;
+
+/**
+ * A partial listing of the children of a parent directory. Since it is a
+ * partial listing, multiple PartialListing may need to be combined to obtain
+ * the full listing of a parent directory.
+ * <p/>
+ * ListingBatch behaves similar to a Future, in that getting the result via
+ * {@link #get()} will throw an Exception if there was a failure.
+ */
+@InterfaceAudience.Public
+@InterfaceStability.Stable
+public class PartialListing<T extends FileStatus> {
+  private final Path listedPath;
+  private final List<T> partialListing;
+  private final RemoteException exception;
+
+  public PartialListing(Path listedPath, List<T> partialListing) {
+    this(listedPath, partialListing, null);
+  }
+
+  public PartialListing(Path listedPath, RemoteException exception) {
+    this(listedPath, null, exception);
+  }
+
+  private PartialListing(Path listedPath, List<T> partialListing,
+      RemoteException exception) {
+    Preconditions.checkArgument(partialListing == null ^ exception == null);
+    this.partialListing = partialListing;
+    this.listedPath = listedPath;
+    this.exception = exception;
+  }
+
+  /**
+   * Partial listing of the path being listed. In the case where the path is
+   * a file. The list will be a singleton with the file itself.
+   *
+   * @return Partial listing of the path being listed.
+   * @throws IOException if there was an exception getting the listing.
+   */
+  public List<T> get() throws IOException {
+    if (exception != null) {
+      throw exception.unwrapRemoteException();
+    }
+    return partialListing;
+  }
+
+  /**
+   * Path being listed.
+   *
+   * @return the path being listed.
+   */
+  public Path getListedPath() {
+    return listedPath;
+  }
+
+  @Override
+  public String toString() {
+    return new ToStringBuilder(this)
+        .append("listedPath", listedPath)
+        .append("partialListing", partialListing)
+        .append("exception", exception)
+        .toString();
+  }
+}

+ 5 - 0
hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/TestFilterFileSystem.java

@@ -27,6 +27,7 @@ import java.lang.reflect.Modifier;
 import java.net.URI;
 import java.util.EnumSet;
 import java.util.Iterator;
+import java.util.List;
 
 import org.apache.commons.logging.Log;
 import org.apache.hadoop.conf.Configuration;
@@ -105,6 +106,10 @@ public class TestFilterFileSystem {
     public FileStatus[] listStatusBatch(Path f, byte[] token);
     public FileStatus[] listStatus(Path[] files);
     public FileStatus[] listStatus(Path[] files, PathFilter filter);
+    public RemoteIterator<PartialListing<LocatedFileStatus>> batchedListLocatedStatusIterator(
+        final List<Path> paths) throws IOException;
+    public RemoteIterator<PartialListing<FileStatus>> batchedListStatusIterator(
+        final List<Path> paths) throws IOException;
     public FileStatus[] globStatus(Path pathPattern);
     public FileStatus[] globStatus(Path pathPattern, PathFilter filter);
     public Iterator<LocatedFileStatus> listFiles(Path path,

+ 4 - 0
hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/TestHarFileSystem.java

@@ -124,6 +124,10 @@ public class TestHarFileSystem {
     public FileStatus[] listStatusBatch(Path f, byte[] token);
     public FileStatus[] listStatus(Path[] files);
     public FileStatus[] listStatus(Path[] files, PathFilter filter);
+    public RemoteIterator<PartialListing<LocatedFileStatus>> batchedListLocatedStatusIterator(
+        final List<Path> paths) throws IOException;
+    public RemoteIterator<PartialListing<FileStatus>> batchedListStatusIterator(
+        final List<Path> paths) throws IOException;
     public FileStatus[] globStatus(Path pathPattern);
     public FileStatus[] globStatus(Path pathPattern, PathFilter filter);
 

+ 1 - 0
hadoop-hdfs-project/hadoop-hdfs-client/dev-support/findbugsExcludeFile.xml

@@ -6,6 +6,7 @@
       <Class name="org.apache.hadoop.hdfs.inotify.EventBatch"/>
       <Class name="org.apache.hadoop.hdfs.protocol.HdfsFileStatus"/>
       <Class name="org.apache.hadoop.hdfs.protocol.LocatedBlock"/>
+      <Class name="org.apache.hadoop.hdfs.protocol.BatchedDirectoryListing" />
       <Class name="org.apache.hadoop.hdfs.protocol.BlockStoragePolicy"/>
       <Class name="org.apache.hadoop.hdfs.protocol.CorruptFileBlocks"/>
       <Class name="org.apache.hadoop.hdfs.protocol.StripedBlockInfo"/>

+ 20 - 1
hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSClient.java

@@ -105,6 +105,7 @@ import org.apache.hadoop.hdfs.client.impl.LeaseRenewer;
 import org.apache.hadoop.hdfs.net.Peer;
 import org.apache.hadoop.hdfs.protocol.AclException;
 import org.apache.hadoop.hdfs.protocol.AddErasureCodingPolicyResponse;
+import org.apache.hadoop.hdfs.protocol.BatchedDirectoryListing;
 import org.apache.hadoop.hdfs.protocol.BlockStoragePolicy;
 import org.apache.hadoop.hdfs.protocol.CacheDirectiveEntry;
 import org.apache.hadoop.hdfs.protocol.CacheDirectiveInfo;
@@ -1675,6 +1676,24 @@ public class DFSClient implements java.io.Closeable, RemotePeerFactory,
     }
   }
 
+  /**
+   * Get a batched listing for the indicated directories
+   *
+   * @see ClientProtocol#getBatchedListing(String[], byte[], boolean)
+   */
+  public BatchedDirectoryListing batchedListPaths(
+      String[] srcs, byte[] startAfter, boolean needLocation)
+      throws IOException {
+    checkOpen();
+    try {
+      return namenode.getBatchedListing(srcs, startAfter, needLocation);
+    } catch(RemoteException re) {
+      throw re.unwrapRemoteException(AccessControlException.class,
+          FileNotFoundException.class,
+          UnresolvedPathException.class);
+    }
+  }
+
   /**
    * Get the file info for a specific file or directory.
    * @param src The string representation of the path to the file
@@ -1694,7 +1713,7 @@ public class DFSClient implements java.io.Closeable, RemotePeerFactory,
     }
   }
 
-  /**
+ /**
    * Get the file info for a specific file or directory.
    * @param src The string representation of the path to the file
    * @param needBlockToken Include block tokens in {@link LocatedBlocks}.

+ 111 - 0
hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DistributedFileSystem.java

@@ -21,6 +21,7 @@ package org.apache.hadoop.hdfs;
 
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
 import org.apache.commons.collections.list.TreeList;
 import org.apache.hadoop.HadoopIllegalArgumentException;
 import org.apache.hadoop.classification.InterfaceAudience;
@@ -49,6 +50,7 @@ import org.apache.hadoop.fs.FsStatus;
 import org.apache.hadoop.fs.GlobalStorageStatistics;
 import org.apache.hadoop.fs.GlobalStorageStatistics.StorageStatisticsProvider;
 import org.apache.hadoop.fs.InvalidPathHandleException;
+import org.apache.hadoop.fs.PartialListing;
 import org.apache.hadoop.fs.PathHandle;
 import org.apache.hadoop.fs.LocatedFileStatus;
 import org.apache.hadoop.fs.Options;
@@ -73,6 +75,7 @@ import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys;
 import org.apache.hadoop.hdfs.client.HdfsDataOutputStream;
 import org.apache.hadoop.hdfs.client.impl.CorruptFileBlockIterator;
 import org.apache.hadoop.hdfs.protocol.AddErasureCodingPolicyResponse;
+import org.apache.hadoop.hdfs.protocol.BatchedDirectoryListing;
 import org.apache.hadoop.hdfs.protocol.BlockStoragePolicy;
 import org.apache.hadoop.hdfs.protocol.CacheDirectiveEntry;
 import org.apache.hadoop.hdfs.protocol.CacheDirectiveInfo;
@@ -81,6 +84,7 @@ import org.apache.hadoop.hdfs.protocol.CachePoolInfo;
 import org.apache.hadoop.hdfs.protocol.ClientProtocol;
 import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
 import org.apache.hadoop.hdfs.protocol.DirectoryListing;
+import org.apache.hadoop.hdfs.protocol.HdfsPartialListing;
 import org.apache.hadoop.hdfs.protocol.EncryptionZone;
 import org.apache.hadoop.hdfs.protocol.ErasureCodingPolicy;
 import org.apache.hadoop.hdfs.protocol.ErasureCodingPolicyInfo;
@@ -108,6 +112,8 @@ import org.apache.hadoop.security.token.Token;
 import org.apache.hadoop.security.token.DelegationTokenIssuer;
 import org.apache.hadoop.util.ChunkedArrayList;
 import org.apache.hadoop.util.Progressable;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import javax.annotation.Nonnull;
 import java.io.FileNotFoundException;
@@ -120,6 +126,7 @@ import java.util.Collection;
 import java.util.EnumSet;
 import java.util.List;
 import java.util.Map;
+import java.util.NoSuchElementException;
 import java.util.Optional;
 
 import static org.apache.hadoop.fs.impl.PathCapabilitiesSupport.validatePathCapabilityArgs;
@@ -1292,6 +1299,110 @@ public class DistributedFileSystem extends FileSystem
     }
   }
 
+  @Override
+  public RemoteIterator<PartialListing<FileStatus>> batchedListStatusIterator(
+      final List<Path> paths) throws IOException {
+    List<Path> absPaths = Lists.newArrayListWithCapacity(paths.size());
+    for (Path p : paths) {
+      absPaths.add(fixRelativePart(p));
+    }
+    return new PartialListingIterator<>(absPaths, false);
+  }
+
+  @Override
+  public RemoteIterator<PartialListing<LocatedFileStatus>> batchedListLocatedStatusIterator(
+      final List<Path> paths) throws IOException {
+    List<Path> absPaths = Lists.newArrayListWithCapacity(paths.size());
+    for (Path p : paths) {
+      absPaths.add(fixRelativePart(p));
+    }
+    return new PartialListingIterator<>(absPaths, true);
+  }
+
+  private static final Logger LBI_LOG =
+      LoggerFactory.getLogger(PartialListingIterator.class);
+
+  private class PartialListingIterator<T extends FileStatus>
+      implements RemoteIterator<PartialListing<T>> {
+
+    private List<Path> paths;
+    private String[] srcs;
+    private boolean needLocation;
+    private BatchedDirectoryListing batchedListing;
+    private int listingIdx = 0;
+
+    PartialListingIterator(List<Path> paths, boolean needLocation)
+        throws IOException {
+      this.paths = paths;
+      this.srcs = new String[paths.size()];
+      for (int i = 0; i < paths.size(); i++) {
+        this.srcs[i] = getPathName(paths.get(i));
+      }
+      this.needLocation = needLocation;
+
+      // Do the first listing
+      statistics.incrementReadOps(1);
+      storageStatistics.incrementOpCounter(OpType.LIST_LOCATED_STATUS);
+      batchedListing = dfs.batchedListPaths(
+          srcs, HdfsFileStatus.EMPTY_NAME, needLocation);
+      LBI_LOG.trace("Got batchedListing: {}", batchedListing);
+      if (batchedListing == null) { // the directory does not exist
+        throw new FileNotFoundException("One or more paths do not exist.");
+      }
+    }
+
+    @Override
+    public boolean hasNext() throws IOException {
+      if (batchedListing == null) {
+        return false;
+      }
+      // If we're done with the current batch, try to get the next batch
+      if (listingIdx >= batchedListing.getListings().length) {
+        if (!batchedListing.hasMore()) {
+          LBI_LOG.trace("No more elements");
+          return false;
+        }
+        batchedListing = dfs.batchedListPaths(
+            srcs, batchedListing.getStartAfter(), needLocation);
+        LBI_LOG.trace("Got batchedListing: {}", batchedListing);
+        listingIdx = 0;
+      }
+      return listingIdx < batchedListing.getListings().length;
+    }
+
+    @Override
+    @SuppressWarnings("unchecked")
+    public PartialListing<T> next() throws IOException {
+      if (!hasNext()) {
+        throw new NoSuchElementException("No more entries");
+      }
+      HdfsPartialListing listing = batchedListing.getListings()[listingIdx];
+      listingIdx++;
+
+      Path parent = paths.get(listing.getParentIdx());
+
+      if (listing.getException() != null) {
+        return new PartialListing<>(parent, listing.getException());
+      }
+
+      // Qualify paths for the client.
+      List<HdfsFileStatus> statuses = listing.getPartialListing();
+      List<T> qualifiedStatuses =
+          Lists.newArrayListWithCapacity(statuses.size());
+
+      for (HdfsFileStatus status : statuses) {
+        if (needLocation) {
+          qualifiedStatuses.add((T)((HdfsLocatedFileStatus) status)
+              .makeQualifiedLocated(getUri(), parent));
+        } else {
+          qualifiedStatuses.add((T)status.makeQualified(getUri(), parent));
+        }
+      }
+
+      return new PartialListing<>(parent, qualifiedStatuses);
+    }
+  }
+
   /**
    * Create a directory, only when the parent directories exist.
    *

+ 62 - 0
hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/BatchedDirectoryListing.java

@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.protocol;
+
+import org.apache.commons.lang3.builder.ToStringBuilder;
+import org.apache.hadoop.classification.InterfaceAudience;
+
+/**
+ * A struct-like class for holding partial listings returned by the batched
+ * listing API. This class is used internally by the HDFS client and namenode
+ * and is not meant for public consumption.
+ */
+@InterfaceAudience.Private
+public class BatchedDirectoryListing {
+
+  private final HdfsPartialListing[] listings;
+  private final boolean hasMore;
+  private final byte[] startAfter;
+
+  public BatchedDirectoryListing(HdfsPartialListing[] listings,
+      boolean hasMore, byte[] startAfter) {
+    this.listings = listings;
+    this.hasMore = hasMore;
+    this.startAfter = startAfter;
+  }
+
+  public HdfsPartialListing[] getListings() {
+    return listings;
+  }
+
+  public boolean hasMore() {
+    return hasMore;
+  }
+
+  public byte[] getStartAfter() {
+    return startAfter;
+  }
+
+  @Override
+  public String toString() {
+    return new ToStringBuilder(this)
+        .append("listings", listings)
+        .append("hasMore", hasMore)
+        .append("startAfter", startAfter)
+        .toString();
+  }
+}

+ 18 - 0
hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/ClientProtocol.java

@@ -697,6 +697,24 @@ public interface ClientProtocol {
   DirectoryListing getListing(String src, byte[] startAfter,
       boolean needLocation) throws IOException;
 
+  /**
+   * Get a partial listing of the input directories
+   *
+   * @param srcs the input directories
+   * @param startAfter the name to start listing after encoded in Java UTF8
+   * @param needLocation if the FileStatus should contain block locations
+   *
+   * @return a partial listing starting after startAfter. null if the input is
+   *   empty
+   * @throws IOException if an I/O error occurred
+   */
+  @Idempotent
+  @ReadOnly(isCoordinated = true)
+  BatchedDirectoryListing getBatchedListing(
+      String[] srcs,
+      byte[] startAfter,
+      boolean needLocation) throws IOException;
+
   /**
    * Get the list of snapshottable directories that are owned
    * by the current user. Return all the snapshottable directories if the

+ 82 - 0
hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/HdfsPartialListing.java

@@ -0,0 +1,82 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hdfs.protocol;
+
+import com.google.common.base.Preconditions;
+import org.apache.commons.lang3.builder.ToStringBuilder;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.ipc.RemoteException;
+
+import java.util.List;
+
+/**
+ * A partial listing returned by the batched listing API. This is used
+ * internally by the HDFS client and namenode and is not meant for public
+ * consumption.
+ */
+@InterfaceAudience.Private
+public class HdfsPartialListing {
+
+  private final List<HdfsFileStatus> partialListing;
+  private final int parentIdx;
+  private final RemoteException exception;
+
+  public HdfsPartialListing(
+      int parentIdx,
+      List<HdfsFileStatus> partialListing) {
+    this(parentIdx, partialListing, null);
+  }
+
+  public HdfsPartialListing(
+      int parentIdx,
+      RemoteException exception) {
+    this(parentIdx, null, exception);
+  }
+
+  private HdfsPartialListing(
+      int parentIdx,
+      List<HdfsFileStatus> partialListing,
+      RemoteException exception) {
+    Preconditions.checkArgument(partialListing == null ^ exception == null);
+    this.parentIdx = parentIdx;
+    this.partialListing = partialListing;
+    this.exception = exception;
+  }
+
+  public int getParentIdx() {
+    return parentIdx;
+  }
+
+  public List<HdfsFileStatus> getPartialListing() {
+    return partialListing;
+  }
+
+  public RemoteException getException() {
+    return exception;
+  }
+
+  @Override
+  public String toString() {
+    return new ToStringBuilder(this)
+        .append("partialListing", partialListing)
+        .append("parentIdx", parentIdx)
+        .append("exception", exception)
+        .toString();
+  }
+}

+ 50 - 0
hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolTranslatorPB.java

@@ -53,6 +53,7 @@ import org.apache.hadoop.ha.proto.HAServiceProtocolProtos.HAServiceStateProto;
 import org.apache.hadoop.hdfs.AddBlockFlag;
 import org.apache.hadoop.hdfs.inotify.EventBatchList;
 import org.apache.hadoop.hdfs.protocol.AddErasureCodingPolicyResponse;
+import org.apache.hadoop.hdfs.protocol.BatchedDirectoryListing;
 import org.apache.hadoop.hdfs.protocol.BlockStoragePolicy;
 import org.apache.hadoop.hdfs.protocol.CacheDirectiveEntry;
 import org.apache.hadoop.hdfs.protocol.CacheDirectiveInfo;
@@ -63,6 +64,7 @@ import org.apache.hadoop.hdfs.protocol.CorruptFileBlocks;
 import org.apache.hadoop.hdfs.protocol.DatanodeID;
 import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
 import org.apache.hadoop.hdfs.protocol.DirectoryListing;
+import org.apache.hadoop.hdfs.protocol.HdfsPartialListing;
 import org.apache.hadoop.hdfs.protocol.ECBlockGroupStats;
 import org.apache.hadoop.hdfs.protocol.EncryptionZone;
 import org.apache.hadoop.hdfs.protocol.ErasureCodingPolicy;
@@ -114,6 +116,8 @@ import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.Disall
 import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.FinalizeUpgradeRequestProto;
 import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.FsyncRequestProto;
 import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetAdditionalDatanodeRequestProto;
+import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetBatchedListingRequestProto;
+import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetBatchedListingResponseProto;
 import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetBlockLocationsRequestProto;
 import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetBlockLocationsResponseProto;
 import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetContentSummaryRequestProto;
@@ -216,6 +220,7 @@ import org.apache.hadoop.hdfs.protocol.proto.ErasureCodingProtos.GetErasureCodin
 import org.apache.hadoop.hdfs.protocol.proto.ErasureCodingProtos.SetErasureCodingPolicyRequestProto;
 import org.apache.hadoop.hdfs.protocol.proto.ErasureCodingProtos.UnsetErasureCodingPolicyRequestProto;
 import org.apache.hadoop.hdfs.protocol.proto.ErasureCodingProtos.CodecProto;
+import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.BatchedDirectoryListingProto;
 import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.ErasureCodingPolicyProto;
 import org.apache.hadoop.hdfs.protocol.proto.XAttrProtos.GetXAttrsRequestProto;
 import org.apache.hadoop.hdfs.protocol.proto.XAttrProtos.ListXAttrsRequestProto;
@@ -233,6 +238,7 @@ import org.apache.hadoop.ipc.ProtobufRpcEngine;
 import org.apache.hadoop.ipc.ProtocolMetaInterface;
 import org.apache.hadoop.ipc.ProtocolTranslator;
 import org.apache.hadoop.ipc.RPC;
+import org.apache.hadoop.ipc.RemoteException;
 import org.apache.hadoop.ipc.RpcClientUtil;
 import org.apache.hadoop.security.proto.SecurityProtos.CancelDelegationTokenRequestProto;
 import org.apache.hadoop.security.proto.SecurityProtos.GetDelegationTokenRequestProto;
@@ -686,6 +692,50 @@ public class ClientNamenodeProtocolTranslatorPB implements
     }
   }
 
+  @Override
+  public BatchedDirectoryListing getBatchedListing(
+      String[] srcs, byte[] startAfter, boolean needLocation)
+      throws IOException {
+    GetBatchedListingRequestProto req = GetBatchedListingRequestProto
+        .newBuilder()
+        .addAllPaths(Arrays.asList(srcs))
+        .setStartAfter(ByteString.copyFrom(startAfter))
+        .setNeedLocation(needLocation).build();
+    try {
+      GetBatchedListingResponseProto result =
+          rpcProxy.getBatchedListing(null, req);
+
+      if (result.getListingsCount() > 0) {
+        HdfsPartialListing[] listingArray =
+            new HdfsPartialListing[result.getListingsCount()];
+        int listingIdx = 0;
+        for (BatchedDirectoryListingProto proto : result.getListingsList()) {
+          HdfsPartialListing listing;
+          if (proto.hasException()) {
+            HdfsProtos.RemoteExceptionProto reProto = proto.getException();
+            RemoteException ex = new RemoteException(
+                reProto.getClassName(), reProto.getMessage());
+            listing = new HdfsPartialListing(proto.getParentIdx(), ex);
+          } else {
+            List<HdfsFileStatus> statuses =
+                PBHelperClient.convertHdfsFileStatus(
+                    proto.getPartialListingList());
+            listing = new HdfsPartialListing(proto.getParentIdx(), statuses);
+          }
+          listingArray[listingIdx++] = listing;
+        }
+        BatchedDirectoryListing batchedListing =
+            new BatchedDirectoryListing(listingArray, result.getHasMore(),
+                result.getStartAfter().toByteArray());
+        return batchedListing;
+      }
+      return null;
+    } catch (ServiceException e) {
+      throw ProtobufHelper.getRemoteException(e);
+    }
+  }
+
+
   @Override
   public void renewLease(String clientName) throws IOException {
     RenewLeaseRequestProto req = RenewLeaseRequestProto.newBuilder()

+ 12 - 0
hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelperClient.java

@@ -2063,6 +2063,18 @@ public class PBHelperClient {
     return result;
   }
 
+  public static List<HdfsFileStatus> convertHdfsFileStatus(
+      List<HdfsFileStatusProto> fs) {
+    if (fs == null) {
+      return null;
+    }
+    List<HdfsFileStatus> result = Lists.newArrayListWithCapacity(fs.size());
+    for (HdfsFileStatusProto proto : fs) {
+      result.add(convert(proto));
+    }
+    return result;
+  }
+
   // The creatFlag field in PB is a bitmask whose values are the same a the
   // emum values of CreateFlag
   public static int convertCreateFlag(EnumSetWritable<CreateFlag> flag) {

+ 13 - 0
hadoop-hdfs-project/hadoop-hdfs-client/src/main/proto/ClientNamenodeProtocol.proto

@@ -285,6 +285,18 @@ message GetListingResponseProto {
   optional DirectoryListingProto dirList = 1;
 }
 
+message GetBatchedListingRequestProto {
+  repeated string paths = 1;
+  required bytes startAfter = 2;
+  required bool needLocation = 3;
+}
+
+message GetBatchedListingResponseProto {
+  repeated BatchedDirectoryListingProto listings = 1;
+  required bool hasMore = 2;
+  required bytes startAfter = 3;
+}
+
 message GetSnapshottableDirListingRequestProto { // no input parameters
 }
 message GetSnapshottableDirListingResponseProto {
@@ -887,6 +899,7 @@ service ClientNamenodeProtocol {
   rpc delete(DeleteRequestProto) returns(DeleteResponseProto);
   rpc mkdirs(MkdirsRequestProto) returns(MkdirsResponseProto);
   rpc getListing(GetListingRequestProto) returns(GetListingResponseProto);
+  rpc getBatchedListing (GetBatchedListingRequestProto) returns (GetBatchedListingResponseProto);
   rpc renewLease(RenewLeaseRequestProto) returns(RenewLeaseResponseProto);
   rpc recoverLease(RecoverLeaseRequestProto)
       returns(RecoverLeaseResponseProto);

+ 18 - 0
hadoop-hdfs-project/hadoop-hdfs-client/src/main/proto/hdfs.proto

@@ -274,6 +274,12 @@ message LocatedBlockProto {
   repeated hadoop.common.TokenProto blockTokens = 10; // each internal block has a block token
 }
 
+message BatchedListingKeyProto {
+  required bytes checksum = 1;
+  required uint32 pathIndex = 2;
+  required bytes startAfter = 3;
+}
+
 message DataEncryptionKeyProto {
   required uint32 keyId = 1;
   required string blockPoolId = 2;
@@ -526,6 +532,18 @@ message DirectoryListingProto {
   required uint32 remainingEntries  = 2;
 }
 
+message RemoteExceptionProto {
+  required string className = 1;
+  optional string message = 2;
+}
+
+// Directory listing result for a batched listing call.
+message BatchedDirectoryListingProto {
+  repeated HdfsFileStatusProto partialListing = 1;
+  required uint32 parentIdx = 2;
+  optional RemoteExceptionProto exception = 3;
+}
+
 /**
  * Status of a snapshottable directory: besides the normal information for 
  * a directory status, also include snapshot quota, number of snapshots, and

+ 1 - 0
hadoop-hdfs-project/hadoop-hdfs-client/src/test/java/org/apache/hadoop/hdfs/protocol/TestReadOnly.java

@@ -39,6 +39,7 @@ public class TestReadOnly {
           "getStoragePolicies",
           "getStoragePolicy",
           "getListing",
+          "getBatchedListing",
           "getSnapshottableDirListing",
           "getPreferredBlockSize",
           "listCorruptFileBlocks",

+ 7 - 0
hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterClientProtocol.java

@@ -43,6 +43,7 @@ import org.apache.hadoop.hdfs.DFSConfigKeys;
 import org.apache.hadoop.hdfs.DFSUtil;
 import org.apache.hadoop.hdfs.inotify.EventBatchList;
 import org.apache.hadoop.hdfs.protocol.AddErasureCodingPolicyResponse;
+import org.apache.hadoop.hdfs.protocol.BatchedDirectoryListing;
 import org.apache.hadoop.hdfs.protocol.BlockStoragePolicy;
 import org.apache.hadoop.hdfs.protocol.CacheDirectiveEntry;
 import org.apache.hadoop.hdfs.protocol.CacheDirectiveInfo;
@@ -855,6 +856,12 @@ public class RouterClientProtocol implements ClientProtocol {
     return new DirectoryListing(combinedData, remainingEntries);
   }
 
+  @Override
+  public BatchedDirectoryListing getBatchedListing(String[] srcs,
+      byte[] startAfter, boolean needLocation) throws IOException {
+    throw new UnsupportedOperationException("Not implemented");
+  }
+
   @Override
   public HdfsFileStatus getFileInfo(String src) throws IOException {
     rpcServer.checkOperation(NameNode.OperationCategory.READ);

+ 8 - 0
hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcServer.java

@@ -65,6 +65,7 @@ import org.apache.hadoop.hdfs.AddBlockFlag;
 import org.apache.hadoop.hdfs.DFSUtil;
 import org.apache.hadoop.hdfs.inotify.EventBatchList;
 import org.apache.hadoop.hdfs.protocol.AddErasureCodingPolicyResponse;
+import org.apache.hadoop.hdfs.protocol.BatchedDirectoryListing;
 import org.apache.hadoop.hdfs.protocol.BlockStoragePolicy;
 import org.apache.hadoop.hdfs.protocol.CacheDirectiveEntry;
 import org.apache.hadoop.hdfs.protocol.CacheDirectiveInfo;
@@ -827,6 +828,13 @@ public class RouterRpcServer extends AbstractService implements ClientProtocol,
     return clientProto.getListing(src, startAfter, needLocation);
   }
 
+  @Override
+  public BatchedDirectoryListing getBatchedListing(
+      String[] srcs, byte[] startAfter, boolean needLocation)
+      throws IOException {
+    throw new UnsupportedOperationException();
+  }
+
   @Override // ClientProtocol
   public HdfsFileStatus getFileInfo(String src) throws IOException {
     return clientProto.getFileInfo(src);

+ 2 - 0
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java

@@ -385,6 +385,8 @@ public class DFSConfigKeys extends CommonConfigurationKeys {
 
   public static final String  DFS_LIST_LIMIT = "dfs.ls.limit";
   public static final int     DFS_LIST_LIMIT_DEFAULT = 1000;
+  public static final String  DFS_NAMENODE_BATCHED_LISTING_LIMIT = "dfs.batched.ls.limit";
+  public static final int     DFS_NAMENODE_BATCHED_LISTING_LIMIT_DEFAULT = 100;
   public static final String  DFS_CONTENT_SUMMARY_LIMIT_KEY = "dfs.content-summary.limit";
   public static final int     DFS_CONTENT_SUMMARY_LIMIT_DEFAULT = 5000;
   public static final String  DFS_CONTENT_SUMMARY_SLEEP_MICROSEC_KEY = "dfs.content-summary.sleep-microsec";

+ 60 - 2
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolServerSideTranslatorPB.java

@@ -25,6 +25,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.stream.Collectors;
 
+import com.google.protobuf.ByteString;
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceStability;
 import org.apache.hadoop.fs.BatchedRemoteIterator.BatchedEntries;
@@ -40,6 +41,7 @@ import org.apache.hadoop.fs.FsServerDefaults;
 import org.apache.hadoop.fs.Options.Rename;
 import org.apache.hadoop.fs.QuotaUsage;
 import org.apache.hadoop.hdfs.protocol.AddErasureCodingPolicyResponse;
+import org.apache.hadoop.hdfs.protocol.BatchedDirectoryListing;
 import org.apache.hadoop.hdfs.protocol.BlockStoragePolicy;
 import org.apache.hadoop.hdfs.protocol.CacheDirectiveEntry;
 import org.apache.hadoop.hdfs.protocol.CacheDirectiveInfo;
@@ -47,6 +49,7 @@ import org.apache.hadoop.hdfs.protocol.CachePoolEntry;
 import org.apache.hadoop.hdfs.protocol.ClientProtocol;
 import org.apache.hadoop.hdfs.protocol.CorruptFileBlocks;
 import org.apache.hadoop.hdfs.protocol.DirectoryListing;
+import org.apache.hadoop.hdfs.protocol.HdfsPartialListing;
 import org.apache.hadoop.hdfs.protocol.EncryptionZone;
 import org.apache.hadoop.hdfs.protocol.ErasureCodingPolicy;
 import org.apache.hadoop.hdfs.protocol.ErasureCodingPolicyInfo;
@@ -111,6 +114,8 @@ import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.FsyncR
 import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.FsyncResponseProto;
 import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetAdditionalDatanodeRequestProto;
 import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetAdditionalDatanodeResponseProto;
+import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetBatchedListingRequestProto;
+import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetBatchedListingResponseProto;
 import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetBlockLocationsRequestProto;
 import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetBlockLocationsResponseProto;
 import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetBlockLocationsResponseProto.Builder;
@@ -262,11 +267,13 @@ import org.apache.hadoop.hdfs.protocol.proto.ErasureCodingProtos.UnsetErasureCod
 import org.apache.hadoop.hdfs.protocol.proto.ErasureCodingProtos.UnsetErasureCodingPolicyResponseProto;
 import org.apache.hadoop.hdfs.protocol.proto.ErasureCodingProtos.GetErasureCodingCodecsRequestProto;
 import org.apache.hadoop.hdfs.protocol.proto.ErasureCodingProtos.GetErasureCodingCodecsResponseProto;
-import org.apache.hadoop.hdfs.protocol.proto.*;
+import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos;
+import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.BatchedDirectoryListingProto;
 import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.BlockStoragePolicyProto;
 import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.DatanodeIDProto;
 import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.DatanodeInfoProto;
 import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.LocatedBlockProto;
+import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.RemoteExceptionProto;
 import org.apache.hadoop.hdfs.protocol.proto.XAttrProtos.GetXAttrsRequestProto;
 import org.apache.hadoop.hdfs.protocol.proto.XAttrProtos.GetXAttrsResponseProto;
 import org.apache.hadoop.hdfs.protocol.proto.XAttrProtos.ListXAttrsRequestProto;
@@ -279,6 +286,7 @@ import org.apache.hadoop.hdfs.security.token.block.DataEncryptionKey;
 import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier;
 import org.apache.hadoop.io.EnumSetWritable;
 import org.apache.hadoop.io.Text;
+import org.apache.hadoop.ipc.RemoteException;
 import org.apache.hadoop.security.proto.SecurityProtos.CancelDelegationTokenRequestProto;
 import org.apache.hadoop.security.proto.SecurityProtos.CancelDelegationTokenResponseProto;
 import org.apache.hadoop.security.proto.SecurityProtos.GetDelegationTokenRequestProto;
@@ -343,6 +351,13 @@ public class ClientNamenodeProtocolServerSideTranslatorPB implements
   private static final GetListingResponseProto VOID_GETLISTING_RESPONSE = 
   GetListingResponseProto.newBuilder().build();
 
+  private static final GetBatchedListingResponseProto
+      VOID_GETBATCHEDLISTING_RESPONSE =
+      GetBatchedListingResponseProto.newBuilder()
+          .setStartAfter(ByteString.copyFromUtf8(""))
+          .setHasMore(false)
+          .build();
+
   private static final RenewLeaseResponseProto VOID_RENEWLEASE_RESPONSE = 
   RenewLeaseResponseProto.newBuilder().build();
 
@@ -742,7 +757,50 @@ public class ClientNamenodeProtocolServerSideTranslatorPB implements
       throw new ServiceException(e);
     }
   }
-  
+
+  @Override
+  public GetBatchedListingResponseProto getBatchedListing(
+      RpcController controller,
+      GetBatchedListingRequestProto request) throws ServiceException {
+    try {
+      BatchedDirectoryListing result = server.getBatchedListing(
+          request.getPathsList().toArray(new String[] {}),
+          request.getStartAfter().toByteArray(),
+          request.getNeedLocation());
+      if (result != null) {
+        GetBatchedListingResponseProto.Builder builder =
+            GetBatchedListingResponseProto.newBuilder();
+        for (HdfsPartialListing partialListing : result.getListings()) {
+          BatchedDirectoryListingProto.Builder listingBuilder =
+              BatchedDirectoryListingProto.newBuilder();
+          if (partialListing.getException() != null) {
+            RemoteException ex = partialListing.getException();
+            RemoteExceptionProto.Builder rexBuilder =
+                RemoteExceptionProto.newBuilder();
+            rexBuilder.setClassName(ex.getClassName());
+            if (ex.getMessage() != null) {
+              rexBuilder.setMessage(ex.getMessage());
+            }
+            listingBuilder.setException(rexBuilder.build());
+          } else {
+            for (HdfsFileStatus f : partialListing.getPartialListing()) {
+              listingBuilder.addPartialListing(PBHelperClient.convert(f));
+            }
+          }
+          listingBuilder.setParentIdx(partialListing.getParentIdx());
+          builder.addListings(listingBuilder);
+        }
+        builder.setHasMore(result.hasMore());
+        builder.setStartAfter(ByteString.copyFrom(result.getStartAfter()));
+        return builder.build();
+      } else {
+        return VOID_GETBATCHEDLISTING_RESPONSE;
+      }
+    } catch (IOException e) {
+      throw new ServiceException(e);
+    }
+  }
+
   @Override
   public RenewLeaseResponseProto renewLease(RpcController controller,
       RenewLeaseRequestProto req) throws ServiceException {

+ 4 - 0
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java

@@ -248,6 +248,10 @@ public class FSDirectory implements Closeable {
     return this.dirLock.getWriteHoldCount();
   }
 
+  public int getListLimit() {
+    return lsLimit;
+  }
+
   @VisibleForTesting
   public final EncryptionZoneManager ezManager;
 

+ 175 - 2
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java

@@ -98,6 +98,11 @@ import static org.apache.hadoop.ha.HAServiceProtocol.HAServiceState.ACTIVE;
 import static org.apache.hadoop.ha.HAServiceProtocol.HAServiceState.OBSERVER;
 
 import org.apache.hadoop.hdfs.protocol.ErasureCodingPolicyInfo;
+
+import com.google.common.collect.Maps;
+import com.google.protobuf.ByteString;
+import org.apache.hadoop.hdfs.protocol.BatchedDirectoryListing;
+import org.apache.hadoop.hdfs.protocol.HdfsPartialListing;
 import org.apache.hadoop.hdfs.protocol.OpenFilesIterator.OpenFilesType;
 import org.apache.hadoop.hdfs.protocol.ReplicatedBlockStats;
 import org.apache.hadoop.hdfs.protocol.ECBlockGroupStats;
@@ -108,6 +113,7 @@ import org.apache.hadoop.hdfs.protocol.SnapshotDiffReport;
 import org.apache.hadoop.hdfs.server.common.ECTopologyVerifier;
 import org.apache.hadoop.hdfs.server.namenode.metrics.ReplicatedBlocksMBean;
 import org.apache.hadoop.hdfs.server.protocol.SlowDiskReports;
+import org.apache.hadoop.ipc.ObserverRetryOnActiveException;
 import org.apache.hadoop.util.Time;
 import static org.apache.hadoop.util.Time.now;
 import static org.apache.hadoop.util.Time.monotonicNow;
@@ -127,6 +133,9 @@ import java.net.InetAddress;
 import java.net.InetSocketAddress;
 import java.net.URI;
 import java.nio.file.Files;
+import java.security.GeneralSecurityException;
+import java.security.MessageDigest;
+import java.security.NoSuchAlgorithmException;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collection;
@@ -136,6 +145,7 @@ import java.util.EnumSet;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Iterator;
+import java.util.LinkedHashMap;
 import java.util.LinkedHashSet;
 import java.util.List;
 import java.util.Map;
@@ -225,6 +235,7 @@ import org.apache.hadoop.hdfs.protocol.SnapshotAccessControlException;
 import org.apache.hadoop.hdfs.protocol.SnapshotException;
 import org.apache.hadoop.hdfs.protocol.SnapshottableDirectoryStatus;
 import org.apache.hadoop.hdfs.protocol.datatransfer.ReplaceDatanodeOnFailure;
+import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.BatchedListingKeyProto;
 import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
 import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier;
 import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenSecretManager;
@@ -289,7 +300,7 @@ import org.apache.hadoop.hdfs.web.JsonUtil;
 import org.apache.hadoop.io.IOUtils;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.ipc.CallerContext;
-import org.apache.hadoop.ipc.ObserverRetryOnActiveException;
+import org.apache.hadoop.ipc.RemoteException;
 import org.apache.hadoop.ipc.RetriableException;
 import org.apache.hadoop.ipc.RetryCache;
 import org.apache.hadoop.ipc.Server;
@@ -531,6 +542,10 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
 
   private final long minBlockSize;         // minimum block size
   final long maxBlocksPerFile;     // maximum # of blocks per file
+
+  // Maximum number of paths that can be listed per batched call.
+  private final int batchedListingLimit;
+
   private final int numCommittedAllowed;
 
   /** Lock to protect FSNamesystem. */
@@ -599,6 +614,8 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
    */
   private final Object metaSaveLock = new Object();
 
+  private final MessageDigest digest;
+
   /**
    * Notify that loading of this FSDirectory is complete, and
    * it is imageLoaded for use
@@ -822,6 +839,12 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
             + DFS_CHECKSUM_TYPE_KEY + ": " + checksumTypeStr);
       }
 
+      try {
+        digest = MessageDigest.getInstance("MD5");
+      } catch (NoSuchAlgorithmException e) {
+        throw new IOException("Algorithm 'MD5' not found");
+      }
+
       this.serverDefaults = new FsServerDefaults(
           conf.getLongBytes(DFS_BLOCK_SIZE_KEY, DFS_BLOCK_SIZE_DEFAULT),
           conf.getInt(DFS_BYTES_PER_CHECKSUM_KEY, DFS_BYTES_PER_CHECKSUM_DEFAULT),
@@ -844,6 +867,13 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
           DFSConfigKeys.DFS_NAMENODE_MIN_BLOCK_SIZE_DEFAULT);
       this.maxBlocksPerFile = conf.getLong(DFSConfigKeys.DFS_NAMENODE_MAX_BLOCKS_PER_FILE_KEY,
           DFSConfigKeys.DFS_NAMENODE_MAX_BLOCKS_PER_FILE_DEFAULT);
+      this.batchedListingLimit = conf.getInt(
+          DFSConfigKeys.DFS_NAMENODE_BATCHED_LISTING_LIMIT,
+          DFSConfigKeys.DFS_NAMENODE_BATCHED_LISTING_LIMIT_DEFAULT);
+      Preconditions.checkArgument(
+          batchedListingLimit > 0,
+          DFSConfigKeys.DFS_NAMENODE_BATCHED_LISTING_LIMIT +
+              " must be greater than zero");
       this.numCommittedAllowed = conf.getInt(
           DFSConfigKeys.DFS_NAMENODE_FILE_CLOSE_NUM_COMMITTED_ALLOWED_KEY,
           DFSConfigKeys.DFS_NAMENODE_FILE_CLOSE_NUM_COMMITTED_ALLOWED_DEFAULT);
@@ -897,7 +927,7 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
       alwaysUseDelegationTokensForTests = conf.getBoolean(
           DFS_NAMENODE_DELEGATION_TOKEN_ALWAYS_USE_KEY,
           DFS_NAMENODE_DELEGATION_TOKEN_ALWAYS_USE_DEFAULT);
-      
+
       this.dtSecretManager = createDelegationTokenSecretManager(conf);
       this.dir = new FSDirectory(this, conf);
       this.snapshotManager = new SnapshotManager(conf, dir);
@@ -3926,6 +3956,149 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
     return dl;
   }
 
+  public byte[] getSrcPathsHash(String[] srcs) {
+    synchronized (digest) {
+      for (String src : srcs) {
+        digest.update(src.getBytes(Charsets.UTF_8));
+      }
+      byte[] result = digest.digest();
+      digest.reset();
+      return result;
+    }
+  }
+
+  BatchedDirectoryListing getBatchedListing(String[] srcs, byte[] startAfter,
+      boolean needLocation) throws IOException {
+
+    if (srcs.length > this.batchedListingLimit) {
+      String msg = String.format("Too many source paths (%d > %d)",
+          srcs.length, batchedListingLimit);
+      throw new IllegalArgumentException(msg);
+    }
+
+    // Parse the startAfter key if present
+    int srcsIndex = 0;
+    byte[] indexStartAfter = new byte[0];
+
+    if (startAfter.length > 0) {
+      BatchedListingKeyProto startAfterProto =
+          BatchedListingKeyProto.parseFrom(startAfter);
+      // Validate that the passed paths match the checksum from key
+      Preconditions.checkArgument(
+          Arrays.equals(
+              startAfterProto.getChecksum().toByteArray(),
+              getSrcPathsHash(srcs)));
+      srcsIndex = startAfterProto.getPathIndex();
+      indexStartAfter = startAfterProto.getStartAfter().toByteArray();
+      // Special case: if the indexStartAfter key is an empty array, it
+      // means the last element we listed was a file, not a directory.
+      // Skip it so we don't list it twice.
+      if (indexStartAfter.length == 0) {
+        srcsIndex++;
+      }
+    }
+    final int startSrcsIndex = srcsIndex;
+    final String operationName = "listStatus";
+    final FSPermissionChecker pc = getPermissionChecker();
+
+    BatchedDirectoryListing bdl;
+
+    checkOperation(OperationCategory.READ);
+    readLock();
+    try {
+      checkOperation(NameNode.OperationCategory.READ);
+
+      // List all directories from the starting index until we've reached
+      // ls limit OR finished listing all srcs.
+      LinkedHashMap<Integer, HdfsPartialListing> listings =
+          Maps.newLinkedHashMap();
+      DirectoryListing lastListing = null;
+      int numEntries = 0;
+      for (; srcsIndex < srcs.length; srcsIndex++) {
+        String src = srcs[srcsIndex];
+        HdfsPartialListing listing;
+        try {
+          DirectoryListing dirListing =
+              getListingInt(dir, pc, src, indexStartAfter, needLocation);
+          if (dirListing == null) {
+            throw new FileNotFoundException("Path " + src + " does not exist");
+          }
+          listing = new HdfsPartialListing(
+              srcsIndex, Lists.newArrayList(dirListing.getPartialListing()));
+          numEntries += listing.getPartialListing().size();
+          lastListing = dirListing;
+        } catch (Exception e) {
+          if (e instanceof AccessControlException) {
+            logAuditEvent(false, operationName, src);
+          }
+          listing = new HdfsPartialListing(
+              srcsIndex,
+              new RemoteException(
+                  e.getClass().getCanonicalName(),
+                  e.getMessage()));
+          lastListing = null;
+          LOG.info("Exception listing src {}", src, e);
+        }
+
+        listings.put(srcsIndex, listing);
+        // Null out the indexStartAfter after the first time.
+        // If we get a partial result, we're done iterating because we're also
+        // over the list limit.
+        if (indexStartAfter.length != 0) {
+          indexStartAfter = new byte[0];
+        }
+        // Terminate if we've reached the maximum listing size
+        if (numEntries >= dir.getListLimit()) {
+          break;
+        }
+      }
+
+      HdfsPartialListing[] partialListingArray =
+          listings.values().toArray(new HdfsPartialListing[] {});
+
+      // Check whether there are more dirs/files to be listed, and if so setting
+      // up the index to start within the first dir to be listed next time.
+      if (srcsIndex >= srcs.length) {
+        // If the loop finished normally, there are no more srcs and we're done.
+        bdl = new BatchedDirectoryListing(
+            partialListingArray,
+            false,
+            new byte[0]);
+      } else if (srcsIndex == srcs.length-1 &&
+          lastListing != null &&
+          !lastListing.hasMore()) {
+        // If we're on the last srcsIndex, then we might be done exactly on an
+        // lsLimit boundary.
+        bdl = new BatchedDirectoryListing(
+            partialListingArray,
+            false,
+            new byte[0]
+        );
+      } else {
+        byte[] lastName = lastListing != null && lastListing.getLastName() !=
+            null ? lastListing.getLastName() : new byte[0];
+        BatchedListingKeyProto proto = BatchedListingKeyProto.newBuilder()
+            .setChecksum(ByteString.copyFrom(getSrcPathsHash(srcs)))
+            .setPathIndex(srcsIndex)
+            .setStartAfter(ByteString.copyFrom(lastName))
+            .build();
+        byte[] returnedStartAfter = proto.toByteArray();
+
+        // Set the startAfter key if the last listing has more entries
+        bdl = new BatchedDirectoryListing(
+            partialListingArray,
+            true,
+            returnedStartAfter);
+      }
+    } finally {
+      readUnlock(operationName);
+    }
+    for (int i = startSrcsIndex; i < srcsIndex; i++) {
+      logAuditEvent(true, operationName, srcs[i]);
+    }
+    return bdl;
+  }
+
   /////////////////////////////////////////////////////////
   //
   // These methods are called by datanodes

+ 23 - 0
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java

@@ -89,6 +89,7 @@ import org.apache.hadoop.hdfs.inotify.EventBatchList;
 import org.apache.hadoop.hdfs.protocol.AclException;
 import org.apache.hadoop.hdfs.protocol.AddErasureCodingPolicyResponse;
 import org.apache.hadoop.hdfs.protocol.AlreadyBeingCreatedException;
+import org.apache.hadoop.hdfs.protocol.BatchedDirectoryListing;
 import org.apache.hadoop.hdfs.protocol.BlockListAsLongs;
 import org.apache.hadoop.hdfs.protocol.BlockStoragePolicy;
 import org.apache.hadoop.hdfs.protocol.CacheDirectiveEntry;
@@ -107,6 +108,7 @@ import org.apache.hadoop.hdfs.protocol.ErasureCodingPolicyInfo;
 import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
 import org.apache.hadoop.hdfs.protocol.FSLimitException;
 import org.apache.hadoop.hdfs.protocol.HdfsLocatedFileStatus;
+import org.apache.hadoop.hdfs.protocol.HdfsPartialListing;
 import org.apache.hadoop.hdfs.protocol.LastBlockWithStatus;
 import org.apache.hadoop.hdfs.protocol.HdfsConstants.DatanodeReportType;
 import org.apache.hadoop.hdfs.protocol.HdfsConstants.ReencryptAction;
@@ -1179,6 +1181,27 @@ public class NameNodeRpcServer implements NamenodeProtocols {
     return files;
   }
 
+  @Override // ClientProtocol
+  public BatchedDirectoryListing getBatchedListing(
+      String[] srcs,
+      byte[] startAfter,
+      boolean needLocation) throws IOException {
+    checkNNStartup();
+    BatchedDirectoryListing batchedListing =
+        namesystem.getBatchedListing(srcs, startAfter, needLocation);
+    if (batchedListing != null) {
+      metrics.incrGetListingOps();
+      int numEntries = 0;
+      for (HdfsPartialListing partial : batchedListing.getListings()) {
+        if (partial.getPartialListing() != null) {
+          numEntries += partial.getPartialListing().size();
+        }
+      }
+      metrics.incrFilesInGetListingOps(numEntries);
+    }
+    return batchedListing;
+  }
+
   @Override // ClientProtocol
   public HdfsFileStatus getFileInfo(String src) throws IOException {
     checkNNStartup();

+ 10 - 0
hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml

@@ -4556,6 +4556,16 @@
   </description>
 </property>
 
+<property>
+  <name>dfs.batched.ls.limit</name>
+  <value>100</value>
+  <description>
+    Limit the number of paths that can be listed in a single batched
+    listing call. printed by ls. If less or equal to
+    zero, at most DFS_LIST_LIMIT_DEFAULT (= 1000) will be printed.
+  </description>
+</property>
+
 <property>
   <name>dfs.ls.limit</name>
   <value>1000</value>

+ 35 - 0
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/ListingBenchmark.java

@@ -0,0 +1,35 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hdfs;
+
+import org.apache.hadoop.hdfs.server.namenode.NameNode;
+
+import java.io.IOException;
+
+public class ListingBenchmark {
+
+  public static void main(String[] args) throws IOException {
+    HdfsConfiguration conf = new HdfsConfiguration();
+    MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf)
+        .numDataNodes(0)
+        .format(true)
+        .build();
+    NameNode nn = cluster.getNameNode();
+  }
+}

+ 410 - 0
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestBatchedListDirectories.java

@@ -0,0 +1,410 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hdfs;
+
+import com.google.common.collect.Lists;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.LocatedFileStatus;
+import org.apache.hadoop.fs.PartialListing;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.RemoteIterator;
+import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.ipc.RemoteException;
+import org.apache.hadoop.security.AccessControlException;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.hamcrest.core.StringContains;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.security.PrivilegedExceptionAction;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+/**
+ * Tests for the batched listing API.
+ */
+public class TestBatchedListDirectories {
+
+  private static MiniDFSCluster cluster;
+  private static Configuration conf;
+  private static DistributedFileSystem dfs;
+
+  @Rule
+  public ExpectedException thrown = ExpectedException.none();
+
+  private static final List<Path> SUBDIR_PATHS = Lists.newArrayList();
+  private static final List<Path> FILE_PATHS = Lists.newArrayList();
+  private static final int FIRST_LEVEL_DIRS = 2;
+  private static final int SECOND_LEVEL_DIRS = 3;
+  private static final int FILES_PER_DIR = 5;
+  private static final Path EMPTY_DIR_PATH = new Path("/emptydir");
+  private static final Path DATA_FILE_PATH = new Path("/datafile");
+  private static final Path INACCESSIBLE_DIR_PATH = new Path("/noperms");
+  private static final Path INACCESSIBLE_FILE_PATH =
+      new Path(INACCESSIBLE_DIR_PATH, "nopermsfile");
+
+  private static Path getSubDirName(int i, int j) {
+    return new Path(String.format("/dir%d/subdir%d", i, j));
+  }
+
+  private static Path getFileName(int i, int j, int k) {
+    Path dirPath = getSubDirName(i, j);
+    return new Path(dirPath, "file" + k);
+  }
+
+  private static void assertSubDirEquals(int i, int j, Path p) {
+    assertTrue(p.toString().startsWith("hdfs://"));
+    Path expected = getSubDirName(i, j);
+    assertEquals("Unexpected subdir name",
+        expected.toString(), p.toUri().getPath());
+  }
+
+  private static void assertFileEquals(int i, int j, int k, Path p) {
+    assertTrue(p.toString().startsWith("hdfs://"));
+    Path expected = getFileName(i, j, k);
+    assertEquals("Unexpected file name",
+        expected.toString(), p.toUri().getPath());
+  }
+
+  private static void loadData() throws Exception {
+    for (int i = 0; i < FIRST_LEVEL_DIRS; i++) {
+      for (int j = 0; j < SECOND_LEVEL_DIRS; j++) {
+        Path dirPath = getSubDirName(i, j);
+        dfs.mkdirs(dirPath);
+        SUBDIR_PATHS.add(dirPath);
+        for (int k = 0; k < FILES_PER_DIR; k++) {
+          Path filePath = getFileName(i, j, k);
+          dfs.create(filePath, (short)1).close();
+          FILE_PATHS.add(filePath);
+        }
+      }
+    }
+    dfs.mkdirs(EMPTY_DIR_PATH);
+    FSDataOutputStream fsout = dfs.create(DATA_FILE_PATH, (short)1);
+    fsout.write(123);
+    fsout.close();
+
+    dfs.mkdirs(INACCESSIBLE_DIR_PATH);
+    dfs.create(INACCESSIBLE_FILE_PATH, (short)1).close();
+    dfs.setPermission(INACCESSIBLE_DIR_PATH, new FsPermission(0000));
+  }
+
+  @BeforeClass
+  public static void beforeClass() throws Exception {
+    conf = new HdfsConfiguration();
+    conf.setInt(DFSConfigKeys.DFS_LIST_LIMIT, 7);
+    conf.setInt(DFSConfigKeys.DFS_NAMENODE_BATCHED_LISTING_LIMIT,
+        FIRST_LEVEL_DIRS * SECOND_LEVEL_DIRS * FILES_PER_DIR);
+    cluster = new MiniDFSCluster.Builder(conf)
+        .numDataNodes(1)
+        .build();
+    dfs = cluster.getFileSystem();
+    loadData();
+  }
+
+  @AfterClass
+  public static void afterClass() {
+    if (cluster != null) {
+      cluster.shutdown();
+    }
+  }
+
+  private static List<PartialListing<FileStatus>> getListings(List<Path> paths)
+      throws IOException {
+    List<PartialListing<FileStatus>> returned = Lists.newArrayList();
+    RemoteIterator<PartialListing<FileStatus>> it =
+        dfs.batchedListStatusIterator(paths);
+    while (it.hasNext()) {
+      returned.add(it.next());
+    }
+    return returned;
+  }
+
+  private static List<FileStatus> listingsToStatuses(
+      List<PartialListing<FileStatus>> listings) throws IOException {
+    List<FileStatus> returned = Lists.newArrayList();
+    for (PartialListing<FileStatus> listing : listings) {
+      returned.addAll(listing.get());
+    }
+    return returned;
+  }
+
+  private static List<FileStatus> getStatuses(List<Path> paths)
+      throws IOException {
+    List<PartialListing<FileStatus>> listings = getListings(paths);
+    return listingsToStatuses(listings);
+  }
+
+  @Test
+  public void testEmptyPath() throws Exception {
+    thrown.expect(FileNotFoundException.class);
+    List<Path> paths = Lists.newArrayList();
+    getStatuses(paths);
+  }
+
+  @Test
+  public void testEmptyDir() throws Exception {
+    List<Path> paths = Lists.newArrayList(EMPTY_DIR_PATH);
+    List<PartialListing<FileStatus>> listings = getListings(paths);
+    assertEquals(1, listings.size());
+    PartialListing<FileStatus> listing = listings.get(0);
+    assertEquals(EMPTY_DIR_PATH, listing.getListedPath());
+    assertEquals(0, listing.get().size());
+  }
+  @Test
+  public void listOneFile() throws Exception {
+    List<Path> paths = Lists.newArrayList();
+    paths.add(FILE_PATHS.get(0));
+    List<FileStatus> statuses = getStatuses(paths);
+    assertEquals(1, statuses.size());
+    assertFileEquals(0, 0, 0, statuses.get(0).getPath());
+  }
+
+  @Test
+  public void listDoesNotExist() throws Exception {
+    thrown.expect(FileNotFoundException.class);
+    List<Path> paths = Lists.newArrayList();
+    paths.add(new Path("/does/not/exist"));
+    getStatuses(paths);
+  }
+
+  @Test
+  public void listSomeDoNotExist() throws Exception {
+    List<Path> paths = Lists.newArrayList();
+    paths.add(new Path("/does/not/exist"));
+    paths.addAll(SUBDIR_PATHS.subList(0, FIRST_LEVEL_DIRS));
+    paths.add(new Path("/does/not/exist"));
+    paths.addAll(SUBDIR_PATHS.subList(0, FIRST_LEVEL_DIRS));
+    paths.add(new Path("/does/not/exist"));
+    List<PartialListing<FileStatus>> listings = getListings(paths);
+    for (int i = 0; i < listings.size(); i++) {
+      PartialListing<FileStatus> partial = listings.get(i);
+      if (partial.getListedPath().toString().equals("/does/not/exist")) {
+        try {
+          partial.get();
+          fail("Expected exception");
+        } catch (FileNotFoundException e) {
+          assertTrue(e.getMessage().contains("/does/not/exist"));
+        }
+      } else {
+        partial.get();
+      }
+    }
+    try {
+      listings.get(listings.size()-1).get();
+      fail("Expected exception");
+    } catch (FileNotFoundException e) {
+      assertTrue(e.getMessage().contains("/does/not/exist"));
+    }
+  }
+
+  @Test
+  public void listDirRelative() throws Exception {
+    dfs.setWorkingDirectory(new Path("/dir0"));
+    List<Path> paths = Lists.newArrayList(new Path("."));
+    List<FileStatus> statuses = getStatuses(paths);
+    assertEquals("Wrong number of items",
+        SECOND_LEVEL_DIRS, statuses.size());
+    for (int i = 0; i < SECOND_LEVEL_DIRS; i++) {
+      FileStatus stat = statuses.get(i);
+      assertSubDirEquals(0, i, stat.getPath());
+    }
+  }
+
+  @Test
+  public void listFilesRelative() throws Exception {
+    dfs.setWorkingDirectory(new Path("/dir0"));
+    List<Path> paths = Lists.newArrayList(new Path("subdir0"));
+    List<FileStatus> statuses = getStatuses(paths);
+    assertEquals("Wrong number of items",
+        FILES_PER_DIR, statuses.size());
+    for (int i = 0; i < FILES_PER_DIR; i++) {
+      FileStatus stat = statuses.get(i);
+      assertFileEquals(0, 0, i, stat.getPath());
+    }
+  }
+
+  private void listFilesInternal(int numFiles) throws Exception {
+    List<Path> paths = FILE_PATHS.subList(0, numFiles);
+    List<FileStatus> statuses = getStatuses(paths);
+    assertEquals(paths.size(), statuses.size());
+    for (int i = 0; i < paths.size(); i++) {
+      Path p = paths.get(i);
+      FileStatus stat = statuses.get(i);
+      assertEquals(p.toUri().getPath(), stat.getPath().toUri().getPath());
+    }
+  }
+
+  @Test
+  public void listOneFiles() throws Exception {
+    listFilesInternal(1);
+  }
+
+  @Test
+  public void listSomeFiles() throws Exception {
+    listFilesInternal(FILE_PATHS.size() / 2);
+  }
+
+  @Test
+  public void listAllFiles() throws Exception {
+    listFilesInternal(FILE_PATHS.size());
+  }
+
+  private void listDirectoriesInternal(int numDirs) throws Exception {
+    List<Path> paths = SUBDIR_PATHS.subList(0, numDirs);
+    List<PartialListing<FileStatus>> listings = getListings(paths);
+
+    LinkedHashMap<Path, List<FileStatus>> listing = new LinkedHashMap<>();
+    for (PartialListing<FileStatus> partialListing : listings) {
+      Path parent = partialListing.getListedPath();
+      if (!listing.containsKey(parent)) {
+        listing.put(parent, Lists.newArrayList());
+      }
+      listing.get(parent).addAll(partialListing.get());
+    }
+
+    assertEquals(paths.size(), listing.size());
+    int pathIdx = 0;
+    for (Map.Entry<Path, List<FileStatus>> entry : listing.entrySet()) {
+      Path expected = paths.get(pathIdx++);
+      Path parent = entry.getKey();
+      List<FileStatus> children = entry.getValue();
+      assertEquals(expected, parent);
+      assertEquals(FILES_PER_DIR, children.size());
+    }
+  }
+
+  @Test
+  public void listOneDirectory() throws Exception {
+    listDirectoriesInternal(1);
+  }
+
+  @Test
+  public void listSomeDirectories() throws Exception {
+    listDirectoriesInternal(SUBDIR_PATHS.size() / 2);
+  }
+
+  @Test
+  public void listAllDirectories() throws Exception {
+    listDirectoriesInternal(SUBDIR_PATHS.size());
+  }
+
+  @Test
+  public void listTooManyDirectories() throws Exception {
+    thrown.expect(RemoteException.class);
+    thrown.expectMessage(
+        StringContains.containsString("Too many source paths"));
+    List<Path> paths = Lists.newArrayList(FILE_PATHS);
+    paths.add(SUBDIR_PATHS.get(0));
+    getStatuses(paths);
+  }
+
+  @Test
+  public void listDirsAndEmpty() throws Exception {
+    List<Path> paths = Lists.newArrayList();
+    paths.add(EMPTY_DIR_PATH);
+    paths.add(FILE_PATHS.get(0));
+    paths.add(EMPTY_DIR_PATH);
+    List<PartialListing<FileStatus>> listings = getListings(paths);
+    assertEquals(3, listings.size());
+    assertEquals(0, listings.get(0).get().size());
+    assertEquals(1, listings.get(1).get().size());
+    assertEquals(FILE_PATHS.get(0).toString(),
+        listings.get(1).get().get(0).getPath().toUri().getPath());
+    assertEquals(0, listings.get(2).get().size());
+  }
+
+  @Test
+  public void listSamePaths() throws Exception {
+    List<Path> paths = Lists.newArrayList();
+    paths.add(SUBDIR_PATHS.get(0));
+    paths.add(SUBDIR_PATHS.get(0));
+    paths.add(FILE_PATHS.get(0));
+    paths.add(FILE_PATHS.get(0));
+    List<FileStatus> statuses = getStatuses(paths);
+    assertEquals(FILES_PER_DIR*2 + 2, statuses.size());
+    List<FileStatus> slice = statuses.subList(0, FILES_PER_DIR);
+    for (int i = 0; i < FILES_PER_DIR; i++) {
+      assertFileEquals(0, 0, i, slice.get(i).getPath());
+    }
+    slice = statuses.subList(FILES_PER_DIR, FILES_PER_DIR*2);
+    for (int i = 0; i < FILES_PER_DIR; i++) {
+      assertFileEquals(0, 0, i, slice.get(i).getPath());
+    }
+    assertFileEquals(0, 0, 0, statuses.get(FILES_PER_DIR*2).getPath());
+    assertFileEquals(0, 0, 0, statuses.get(FILES_PER_DIR*2+1).getPath());
+  }
+
+  @Test
+  public void listLocatedStatus() throws Exception {
+    List<Path> paths = Lists.newArrayList();
+    paths.add(DATA_FILE_PATH);
+    RemoteIterator<PartialListing<LocatedFileStatus>> it =
+        dfs.batchedListLocatedStatusIterator(paths);
+    PartialListing<LocatedFileStatus> listing = it.next();
+    List<LocatedFileStatus> statuses = listing.get();
+    assertEquals(1, statuses.size());
+    assertTrue(statuses.get(0).getBlockLocations().length > 0);
+  }
+
+  private void listAsNormalUser(List<Path> paths)
+      throws IOException, InterruptedException {
+    final UserGroupInformation ugi = UserGroupInformation
+        .createRemoteUser("tiffany");
+    ugi.doAs(new PrivilegedExceptionAction<Void>() {
+      @Override
+      public Void run() throws Exception {
+        // try renew with long name
+        FileSystem fs = FileSystem.get(cluster.getURI(), conf);
+        RemoteIterator<PartialListing<FileStatus>> it =
+            fs.batchedListStatusIterator(paths);
+        PartialListing<FileStatus> listing = it.next();
+        listing.get();
+        return null;
+      }
+    });
+  }
+
+  @Test
+  public void listInaccessibleDir() throws Exception {
+    thrown.expect(AccessControlException.class);
+    List<Path> paths = Lists.newArrayList(INACCESSIBLE_DIR_PATH);
+    listAsNormalUser(paths);
+  }
+
+  @Test
+  public void listInaccessibleFile() throws Exception {
+    thrown.expect(AccessControlException.class);
+    List<Path> paths = Lists.newArrayList(INACCESSIBLE_FILE_PATH);
+    listAsNormalUser(paths);
+  }
+}