Procházet zdrojové kódy

HADOOP-16898. Batch listing of multiple directories via an (unstable) interface

Contributed by Steve Loughran.

This moves the new API of HDFS-13616 into a interface which is implemented by
HDFS RPC filesystem client (not WebHDFS or any other connector)

This new interface, BatchListingOperations, is in hadoop-common,
so applications do not need to be compiled with HDFS on the classpath.
They must cast the FS into the interface.

instanceof can probe the client for having the new interface -the patch
also adds a new path capability to probe for this.

The FileSystem implementation is cut; tests updated as appropriate.

All new interfaces/classes/constants are marked as @unstable.

Change-Id: I5623c51f2c75804f58f915dd7e60cb2cffdac681
Steve Loughran před 5 roky
rodič
revize
c734d69a55

+ 61 - 0
hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/BatchListingOperations.java

@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs;
+
+import java.io.IOException;
+import java.util.List;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+
+/**
+ * Interface filesystems MAY implement to offer a batched list.
+ * If implemented, filesystems SHOULD declare
+ * {@link CommonPathCapabilities#FS_EXPERIMENTAL_BATCH_LISTING} to be a supported
+ * path capability.
+ */
+@InterfaceAudience.Public
+@InterfaceStability.Unstable
+public interface BatchListingOperations {
+
+  /**
+   * Batched listing API that returns {@link PartialListing}s for the
+   * passed Paths.
+   *
+   * @param paths List of paths to list.
+   * @return RemoteIterator that returns corresponding PartialListings.
+   * @throws IOException failure
+   */
+  RemoteIterator<PartialListing<FileStatus>> batchedListStatusIterator(
+      List<Path> paths) throws IOException;
+
+  /**
+   * Batched listing API that returns {@link PartialListing}s for the passed
+   * Paths. The PartialListing will contain {@link LocatedFileStatus} entries
+   * with locations.
+   *
+   * @param paths List of paths to list.
+   * @return RemoteIterator that returns corresponding PartialListings.
+   * @throws IOException failure
+   */
+  RemoteIterator<PartialListing<LocatedFileStatus>>
+      batchedListLocatedStatusIterator(
+          List<Path> paths) throws IOException;
+
+}

+ 8 - 0
hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/CommonPathCapabilities.java

@@ -18,6 +18,8 @@
 
 package org.apache.hadoop.fs;
 
+import org.apache.hadoop.classification.InterfaceStability;
+
 /**
  * Common path capabilities.
  */
@@ -123,4 +125,10 @@ public final class CommonPathCapabilities {
    */
   public static final String FS_XATTRS = "fs.capability.paths.xattrs";
 
+  /**
+   * Probe for support for {@link BatchListingOperations}.
+   */
+  @InterfaceStability.Unstable
+  public static final String FS_EXPERIMENTAL_BATCH_LISTING =
+      "fs.capability.batch.listing";
 }

+ 0 - 27
hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FileSystem.java

@@ -2227,33 +2227,6 @@ public abstract class FileSystem extends Configured
     return new DirListingIterator<>(p);
   }
 
-  /**
-   * Batched listing API that returns {@link PartialListing}s for the
-   * passed Paths.
-   *
-   * @param paths List of paths to list.
-   * @return RemoteIterator that returns corresponding PartialListings.
-   * @throws IOException
-   */
-  public RemoteIterator<PartialListing<FileStatus>> batchedListStatusIterator(
-      final List<Path> paths) throws IOException {
-    throw new UnsupportedOperationException("Not implemented");
-  }
-
-  /**
-   * Batched listing API that returns {@link PartialListing}s for the passed
-   * Paths. The PartialListing will contain {@link LocatedFileStatus} entries
-   * with locations.
-   *
-   * @param paths List of paths to list.
-   * @return RemoteIterator that returns corresponding PartialListings.
-   * @throws IOException
-   */
-  public RemoteIterator<PartialListing<LocatedFileStatus>> batchedListLocatedStatusIterator(
-      final List<Path> paths) throws IOException {
-    throw new UnsupportedOperationException("Not implemented");
-  }
-
   /**
    * List the statuses and block locations of the files in the given path.
    * Does not guarantee to return the iterator that traverses statuses

+ 1 - 1
hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/PartialListing.java

@@ -35,7 +35,7 @@ import java.util.List;
  * {@link #get()} will throw an Exception if there was a failure.
  */
 @InterfaceAudience.Public
-@InterfaceStability.Stable
+@InterfaceStability.Unstable
 public class PartialListing<T extends FileStatus> {
   private final Path listedPath;
   private final List<T> partialListing;

+ 0 - 5
hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/TestFilterFileSystem.java

@@ -27,7 +27,6 @@ import java.lang.reflect.Modifier;
 import java.net.URI;
 import java.util.EnumSet;
 import java.util.Iterator;
-import java.util.List;
 
 import org.apache.commons.logging.Log;
 import org.apache.hadoop.conf.Configuration;
@@ -106,10 +105,6 @@ public class TestFilterFileSystem {
     public FileStatus[] listStatusBatch(Path f, byte[] token);
     public FileStatus[] listStatus(Path[] files);
     public FileStatus[] listStatus(Path[] files, PathFilter filter);
-    public RemoteIterator<PartialListing<LocatedFileStatus>> batchedListLocatedStatusIterator(
-        final List<Path> paths) throws IOException;
-    public RemoteIterator<PartialListing<FileStatus>> batchedListStatusIterator(
-        final List<Path> paths) throws IOException;
     public FileStatus[] globStatus(Path pathPattern);
     public FileStatus[] globStatus(Path pathPattern, PathFilter filter);
     public Iterator<LocatedFileStatus> listFiles(Path path,

+ 0 - 4
hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/TestHarFileSystem.java

@@ -125,10 +125,6 @@ public class TestHarFileSystem {
     public FileStatus[] listStatusBatch(Path f, byte[] token);
     public FileStatus[] listStatus(Path[] files);
     public FileStatus[] listStatus(Path[] files, PathFilter filter);
-    public RemoteIterator<PartialListing<LocatedFileStatus>> batchedListLocatedStatusIterator(
-        final List<Path> paths) throws IOException;
-    public RemoteIterator<PartialListing<FileStatus>> batchedListStatusIterator(
-        final List<Path> paths) throws IOException;
     public FileStatus[] globStatus(Path pathPattern);
     public FileStatus[] globStatus(Path pathPattern, PathFilter filter);
 

+ 14 - 1
hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DistributedFileSystem.java

@@ -29,9 +29,11 @@ import org.apache.hadoop.classification.InterfaceStability;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.crypto.key.KeyProvider;
 import org.apache.hadoop.crypto.key.KeyProviderTokenIssuer;
+import org.apache.hadoop.fs.BatchListingOperations;
 import org.apache.hadoop.fs.BlockLocation;
 import org.apache.hadoop.fs.BlockStoragePolicySpi;
 import org.apache.hadoop.fs.CacheFlag;
+import org.apache.hadoop.fs.CommonPathCapabilities;
 import org.apache.hadoop.fs.ContentSummary;
 import org.apache.hadoop.fs.CreateFlag;
 import org.apache.hadoop.fs.FSDataInputStream;
@@ -129,6 +131,8 @@ import java.util.Map;
 import java.util.NoSuchElementException;
 import java.util.Optional;
 
+import static org.apache.hadoop.fs.impl.PathCapabilitiesSupport.validatePathCapabilityArgs;
+
 /****************************************************************
  * Implementation of the abstract FileSystem for the DFS system.
  * This object is the way end-user code interacts with a Hadoop
@@ -138,7 +142,7 @@ import java.util.Optional;
 @InterfaceAudience.LimitedPrivate({ "MapReduce", "HBase" })
 @InterfaceStability.Unstable
 public class DistributedFileSystem extends FileSystem
-    implements KeyProviderTokenIssuer {
+    implements KeyProviderTokenIssuer, BatchListingOperations {
   private Path workingDir;
   private URI uri;
 
@@ -3575,6 +3579,15 @@ public class DistributedFileSystem extends FileSystem
     if (cap.isPresent()) {
       return cap.get();
     }
+    // this switch is for features which are in the DFS client but not
+    // (yet/ever) in the WebHDFS API.
+    switch (validatePathCapabilityArgs(path, capability)) {
+    case CommonPathCapabilities.FS_EXPERIMENTAL_BATCH_LISTING:
+      return true;
+    default:
+      // fall through
+    }
+
     return super.hasPathCapability(p, capability);
   }
 }

+ 10 - 1
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestBatchedListDirectories.java

@@ -20,6 +20,7 @@ package org.apache.hadoop.hdfs;
 
 import com.google.common.collect.Lists;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.CommonPathCapabilities;
 import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
@@ -253,6 +254,13 @@ public class TestBatchedListDirectories {
     }
   }
 
+  @Test
+  public void testDFSHasCapability() throws Throwable {
+    assertTrue("FS does not declare PathCapability support",
+        dfs.hasPathCapability(new Path("/"),
+            CommonPathCapabilities.FS_EXPERIMENTAL_BATCH_LISTING));
+  }
+
   private void listFilesInternal(int numFiles) throws Exception {
     List<Path> paths = FILE_PATHS.subList(0, numFiles);
     List<FileStatus> statuses = getStatuses(paths);
@@ -384,7 +392,8 @@ public class TestBatchedListDirectories {
       @Override
       public Void run() throws Exception {
         // try renew with long name
-        FileSystem fs = FileSystem.get(cluster.getURI(), conf);
+        DistributedFileSystem fs = (DistributedFileSystem)
+            FileSystem.get(cluster.getURI(), conf);
         RemoteIterator<PartialListing<FileStatus>> it =
             fs.batchedListStatusIterator(paths);
         PartialListing<FileStatus> listing = it.next();