Procházet zdrojové kódy

HDFS-14454. RBF: getContentSummary() should allow non-existing folders. Contributed by Inigo Goiri.

Ayush Saxena před 6 roky
rodič
revize
97b672d440

+ 84 - 0
hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RemoteResult.java

@@ -0,0 +1,84 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.federation.router;
+
+import java.io.IOException;
+
+/**
+ * Result from a remote location.
+ * It includes the exception if there was any error.
+ * @param <T> Type of the remote location.
+ * @param <R> Type of the result.
+ */
+public class RemoteResult<T extends RemoteLocationContext, R> {
+  /** The remote location. */
+  private final T loc;
+  /** The result from the remote location. */
+  private final R result;
+  /** If the result is set; used for void types. */
+  private final boolean resultSet;
+  /** The exception if we couldn't get the result. */
+  private final IOException ioe;
+
+  public RemoteResult(T location, R r) {
+    this.loc = location;
+    this.result = r;
+    this.resultSet = true;
+    this.ioe = null;
+  }
+
+  public RemoteResult(T location, IOException e) {
+    this.loc = location;
+    this.result = null;
+    this.resultSet = false;
+    this.ioe = e;
+  }
+
+  public T getLocation() {
+    return loc;
+  }
+
+  public boolean hasResult() {
+    return resultSet;
+  }
+
+  public R getResult() {
+    return result;
+  }
+
+  public boolean hasException() {
+    return getException() != null;
+  }
+
+  public IOException getException() {
+    return ioe;
+  }
+
+  @Override
+  public String toString() {
+    StringBuilder sb = new StringBuilder()
+        .append("loc=").append(getLocation());
+    if (hasResult()) {
+      sb.append(" result=").append(getResult());
+    }
+    if (hasException()) {
+      sb.append(" exception=").append(getException());
+    }
+    return sb.toString();
+  }
+}

+ 39 - 26
hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterClientProtocol.java

@@ -728,9 +728,9 @@ public class RouterClientProtocol implements ClientProtocol {
     RemoteMethod method = new RemoteMethod("getListing",
         new Class<?>[] {String.class, startAfter.getClass(), boolean.class},
         new RemoteParam(), startAfter, needLocation);
-    Map<RemoteLocation, DirectoryListing> listings =
-        rpcClient.invokeConcurrent(locations, method,
-            !this.allowPartialList, false, DirectoryListing.class);
+    final List<RemoteResult<RemoteLocation, DirectoryListing>> listings =
+        rpcClient.invokeConcurrent(
+            locations, method, false, -1, DirectoryListing.class);
 
     Map<String, HdfsFileStatus> nnListing = new TreeMap<>();
     int totalRemainingEntries = 0;
@@ -739,13 +739,17 @@ public class RouterClientProtocol implements ClientProtocol {
     if (listings != null) {
       // Check the subcluster listing with the smallest name
       String lastName = null;
-      for (Map.Entry<RemoteLocation, DirectoryListing> entry :
-          listings.entrySet()) {
-        RemoteLocation location = entry.getKey();
-        DirectoryListing listing = entry.getValue();
-        if (listing == null) {
-          LOG.debug("Cannot get listing from {}", location);
-        } else {
+      for (RemoteResult<RemoteLocation, DirectoryListing> result : listings) {
+        if (result.hasException()) {
+          IOException ioe = result.getException();
+          if (ioe instanceof FileNotFoundException) {
+            RemoteLocation location = result.getLocation();
+            LOG.debug("Cannot get listing from {}", location);
+          } else if (!allowPartialList) {
+            throw ioe;
+          }
+        } else if (result.getResult() != null) {
+          DirectoryListing listing = result.getResult();
           totalRemainingEntries += listing.getRemainingEntries();
           HdfsFileStatus[] partialListing = listing.getPartialListing();
           int length = partialListing.length;
@@ -760,13 +764,14 @@ public class RouterClientProtocol implements ClientProtocol {
       }
 
       // Add existing entries
-      for (Object value : listings.values()) {
-        DirectoryListing listing = (DirectoryListing) value;
+      for (RemoteResult<RemoteLocation, DirectoryListing> result : listings) {
+        DirectoryListing listing = result.getResult();
         if (listing != null) {
           namenodeListingExists = true;
           for (HdfsFileStatus file : listing.getPartialListing()) {
             String filename = file.getLocalName();
-            if (totalRemainingEntries > 0 && filename.compareTo(lastName) > 0) {
+            if (totalRemainingEntries > 0 &&
+                filename.compareTo(lastName) > 0) {
               // Discarding entries further than the lastName
               remainingEntries++;
             } else {
@@ -1110,19 +1115,26 @@ public class RouterClientProtocol implements ClientProtocol {
     rpcServer.checkOperation(NameNode.OperationCategory.READ);
 
     // Get the summaries from regular files
-    Collection<ContentSummary> summaries = new LinkedList<>();
+    final Collection<ContentSummary> summaries = new ArrayList<>();
+    final List<RemoteLocation> locations =
+        rpcServer.getLocationsForPath(path, false);
+    final RemoteMethod method = new RemoteMethod("getContentSummary",
+        new Class<?>[] {String.class}, new RemoteParam());
+    final List<RemoteResult<RemoteLocation, ContentSummary>> results =
+        rpcClient.invokeConcurrent(locations, method,
+            false, -1, ContentSummary.class);
     FileNotFoundException notFoundException = null;
-    try {
-      final List<RemoteLocation> locations =
-          rpcServer.getLocationsForPath(path, false);
-      RemoteMethod method = new RemoteMethod("getContentSummary",
-          new Class<?>[] {String.class}, new RemoteParam());
-      Map<RemoteLocation, ContentSummary> results =
-          rpcClient.invokeConcurrent(locations, method,
-              !this.allowPartialList, false, ContentSummary.class);
-      summaries.addAll(results.values());
-    } catch (FileNotFoundException e) {
-      notFoundException = e;
+    for (RemoteResult<RemoteLocation, ContentSummary> result : results) {
+      if (result.hasException()) {
+        IOException ioe = result.getException();
+        if (ioe instanceof FileNotFoundException) {
+          notFoundException = (FileNotFoundException)ioe;
+        } else if (!allowPartialList) {
+          throw ioe;
+        }
+      } else if (result.getResult() != null) {
+        summaries.add(result.getResult());
+      }
     }
 
     // Add mount points at this level in the tree
@@ -1131,7 +1143,8 @@ public class RouterClientProtocol implements ClientProtocol {
       for (String child : children) {
         Path childPath = new Path(path, child);
         try {
-          ContentSummary mountSummary = getContentSummary(childPath.toString());
+          ContentSummary mountSummary = getContentSummary(
+              childPath.toString());
           if (mountSummary != null) {
             summaries.add(mountSummary);
           }

+ 56 - 23
hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcClient.java

@@ -1083,11 +1083,58 @@ public class RouterRpcClient {
    * @throws IOException If requiredResponse=true and any of the calls throw an
    *           exception.
    */
-  @SuppressWarnings("unchecked")
   public <T extends RemoteLocationContext, R> Map<T, R> invokeConcurrent(
       final Collection<T> locations, final RemoteMethod method,
       boolean requireResponse, boolean standby, long timeOutMs, Class<R> clazz)
           throws IOException {
+    final List<RemoteResult<T, R>> results = invokeConcurrent(
+        locations, method, standby, timeOutMs, clazz);
+
+    final Map<T, R> ret = new TreeMap<>();
+    for (final RemoteResult<T, R> result : results) {
+      // Response from all servers required, use this error.
+      if (requireResponse && result.hasException()) {
+        throw result.getException();
+      }
+      if (result.hasResult()) {
+        ret.put(result.getLocation(), result.getResult());
+      }
+    }
+
+    // Throw the exception for the first location if there are no results
+    if (ret.isEmpty()) {
+      final RemoteResult<T, R> result = results.get(0);
+      if (result.hasException()) {
+        throw result.getException();
+      }
+    }
+
+    return ret;
+  }
+
+  /**
+   * Invokes multiple concurrent proxy calls to different clients. Returns an
+   * array of results.
+   *
+   * Re-throws exceptions generated by the remote RPC call as either
+   * RemoteException or IOException.
+   *
+   * @param <T> The type of the remote location.
+   * @param <R> The type of the remote method return
+   * @param locations List of remote locations to call concurrently.
+   * @param method The remote method and parameters to invoke.
+   * @param standby If the requests should go to the standby namenodes too.
+   * @param timeOutMs Timeout for each individual call.
+   * @param clazz Type of the remote return type.
+   * @return Result of invoking the method per subcluster (list of results).
+   *         This includes the exception for each remote location.
+   * @throws IOException If there are errors invoking the method.
+   */
+  @SuppressWarnings("unchecked")
+  public <T extends RemoteLocationContext, R> List<RemoteResult<T, R>>
+      invokeConcurrent(final Collection<T> locations,
+          final RemoteMethod method, boolean standby, long timeOutMs,
+          Class<R> clazz) throws IOException {
 
     final UserGroupInformation ugi = RouterRpcServer.getRemoteUser();
     final Method m = method.getMethod();
@@ -1103,8 +1150,9 @@ public class RouterRpcClient {
       try {
         Class<?> proto = method.getProtocol();
         Object[] paramList = method.getParams(location);
-        Object result = invokeMethod(ugi, namenodes, proto, m, paramList);
-        return Collections.singletonMap(location, (R) result);
+        R result = (R) invokeMethod(ugi, namenodes, proto, m, paramList);
+        RemoteResult<T, R> remoteResult = new RemoteResult<>(location, result);
+        return Collections.singletonList(remoteResult);
       } catch (IOException ioe) {
         // Localize the exception
         throw processException(ioe, location);
@@ -1151,21 +1199,20 @@ public class RouterRpcClient {
       } else {
         futures = executorService.invokeAll(callables);
       }
-      Map<T, R> results = new TreeMap<>();
-      Map<T, IOException> exceptions = new TreeMap<>();
+      List<RemoteResult<T, R>> results = new ArrayList<>();
       for (int i=0; i<futures.size(); i++) {
         T location = orderedLocations.get(i);
         try {
           Future<Object> future = futures.get(i);
-          Object result = future.get();
-          results.put(location, (R) result);
+          R result = (R) future.get();
+          results.add(new RemoteResult<>(location, result));
         } catch (CancellationException ce) {
           T loc = orderedLocations.get(i);
           String msg = "Invocation to \"" + loc + "\" for \""
               + method.getMethodName() + "\" timed out";
           LOG.error(msg);
           IOException ioe = new SubClusterTimeoutException(msg);
-          exceptions.put(location, ioe);
+          results.add(new RemoteResult<>(location, ioe));
         } catch (ExecutionException ex) {
           Throwable cause = ex.getCause();
           LOG.debug("Canot execute {} in {}: {}",
@@ -1180,22 +1227,8 @@ public class RouterRpcClient {
                 m.getName() + ": " + cause.getMessage(), cause);
           }
 
-          // Response from all servers required, use this error.
-          if (requireResponse) {
-            throw ioe;
-          }
-
           // Store the exceptions
-          exceptions.put(location, ioe);
-        }
-      }
-
-      // Throw the exception for the first location if there are no results
-      if (results.isEmpty()) {
-        T location = orderedLocations.get(0);
-        IOException ioe = exceptions.get(location);
-        if (ioe != null) {
-          throw ioe;
+          results.add(new RemoteResult<>(location, ioe));
         }
       }
 

+ 128 - 0
hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/FederationTestUtils.java

@@ -20,6 +20,7 @@ package org.apache.hadoop.hdfs.server.federation;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
 import static org.mockito.ArgumentMatchers.any;
 import static org.mockito.Mockito.doAnswer;
 import static org.mockito.Mockito.spy;
@@ -31,8 +32,14 @@ import java.io.InputStreamReader;
 import java.lang.management.ManagementFactory;
 import java.lang.reflect.InvocationTargetException;
 import java.lang.reflect.Method;
+import java.net.InetSocketAddress;
+import java.net.URI;
+import java.util.Collection;
+import java.util.Collections;
 import java.util.Date;
+import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
 import java.util.Random;
 
 import javax.management.JMX;
@@ -40,6 +47,7 @@ import javax.management.MBeanServer;
 import javax.management.MalformedObjectNameException;
 import javax.management.ObjectName;
 
+import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
@@ -48,12 +56,18 @@ import org.apache.hadoop.fs.UnsupportedFileSystemException;
 import org.apache.hadoop.fs.permission.FsPermission;
 import org.apache.hadoop.ha.HAServiceProtocol.HAServiceState;
 import org.apache.hadoop.hdfs.DFSConfigKeys;
+import org.apache.hadoop.hdfs.DistributedFileSystem;
+import org.apache.hadoop.hdfs.HdfsConfiguration;
 import org.apache.hadoop.hdfs.server.federation.MiniRouterDFSCluster.NamenodeContext;
 import org.apache.hadoop.hdfs.server.federation.resolver.ActiveNamenodeResolver;
 import org.apache.hadoop.hdfs.server.federation.resolver.FederationNamenodeContext;
 import org.apache.hadoop.hdfs.server.federation.resolver.FederationNamenodeServiceState;
+import org.apache.hadoop.hdfs.server.federation.resolver.MountTableManager;
 import org.apache.hadoop.hdfs.server.federation.resolver.NamenodeStatusReport;
+import org.apache.hadoop.hdfs.server.federation.resolver.order.DestinationOrder;
 import org.apache.hadoop.hdfs.server.federation.router.ConnectionManager;
+import org.apache.hadoop.hdfs.server.federation.router.Router;
+import org.apache.hadoop.hdfs.server.federation.router.RouterClient;
 import org.apache.hadoop.hdfs.server.federation.router.RouterRpcClient;
 import org.apache.hadoop.hdfs.server.federation.router.RouterRpcServer;
 import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
@@ -61,6 +75,12 @@ import org.apache.hadoop.hdfs.server.namenode.NameNode;
 import org.apache.hadoop.hdfs.server.namenode.NameNode.OperationCategory;
 import org.apache.hadoop.hdfs.server.namenode.ha.HAContext;
 import org.apache.hadoop.hdfs.server.federation.store.RouterStore;
+import org.apache.hadoop.hdfs.server.federation.store.StateStoreService;
+import org.apache.hadoop.hdfs.server.federation.store.protocol.AddMountTableEntryRequest;
+import org.apache.hadoop.hdfs.server.federation.store.protocol.AddMountTableEntryResponse;
+import org.apache.hadoop.hdfs.server.federation.store.protocol.GetMountTableEntriesRequest;
+import org.apache.hadoop.hdfs.server.federation.store.protocol.GetMountTableEntriesResponse;
+import org.apache.hadoop.hdfs.server.federation.store.records.MountTable;
 import org.apache.hadoop.hdfs.server.federation.store.records.RouterState;
 import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
 import org.apache.hadoop.security.AccessControlException;
@@ -412,4 +432,112 @@ public final class FederationTestUtils {
           listNamenodeContext.get(index).getNamenodeId());
     }
   }
+
+  /**
+   * Get the file system for HDFS in an RPC port.
+   * @param rpcPort RPC port.
+   * @return HDFS file system.
+   * @throws IOException If it cannot create the file system.
+   */
+  public static FileSystem getFileSystem(int rpcPort) throws IOException {
+    Configuration conf = new HdfsConfiguration();
+    URI uri = URI.create("hdfs://localhost:" + rpcPort);
+    return DistributedFileSystem.get(uri, conf);
+  }
+
+  /**
+   * Get the file system for HDFS for a Router.
+   * @param router Router.
+   * @return HDFS file system.
+   * @throws IOException If it cannot create the file system.
+   */
+  public static FileSystem getFileSystem(final Router router)
+      throws IOException {
+    InetSocketAddress rpcAddress = router.getRpcServerAddress();
+    int rpcPort = rpcAddress.getPort();
+    return getFileSystem(rpcPort);
+  }
+
+  /**
+   * Get the admin interface for a Router.
+   * @param router Router.
+   * @return Admin interface.
+   * @throws IOException If it cannot create the admin interface.
+   */
+  public static RouterClient getAdminClient(
+      final Router router) throws IOException {
+    Configuration conf = new HdfsConfiguration();
+    InetSocketAddress routerSocket = router.getAdminServerAddress();
+    return new RouterClient(routerSocket, conf);
+  }
+
+  /**
+   * Add a mount table entry in some name services and wait until it is
+   * available.
+   * @param router Router to change.
+   * @param mountPoint Name of the mount point.
+   * @param order Order of the mount table entry.
+   * @param nsIds Name service identifiers.
+   * @throws Exception If the entry could not be created.
+   */
+  public static void createMountTableEntry(
+      final Router router,
+      final String mountPoint, final DestinationOrder order,
+      Collection<String> nsIds) throws Exception {
+    createMountTableEntry(
+        Collections.singletonList(router), mountPoint, order, nsIds);
+  }
+
+  /**
+   * Add a mount table entry in some name services and wait until it is
+   * available.
+   * @param routers List of routers.
+   * @param mountPoint Name of the mount point.
+   * @param order Order of the mount table entry.
+   * @param nsIds Name service identifiers.
+   * @throws Exception If the entry could not be created.
+   */
+  public static void createMountTableEntry(
+      final List<Router> routers,
+      final String mountPoint,
+      final DestinationOrder order,
+      final Collection<String> nsIds) throws Exception {
+    Router router = routers.get(0);
+    RouterClient admin = getAdminClient(router);
+    MountTableManager mountTable = admin.getMountTableManager();
+    Map<String, String> destMap = new HashMap<>();
+    for (String nsId : nsIds) {
+      destMap.put(nsId, mountPoint);
+    }
+    MountTable newEntry = MountTable.newInstance(mountPoint, destMap);
+    newEntry.setDestOrder(order);
+    AddMountTableEntryRequest addRequest =
+        AddMountTableEntryRequest.newInstance(newEntry);
+    AddMountTableEntryResponse addResponse =
+        mountTable.addMountTableEntry(addRequest);
+    boolean created = addResponse.getStatus();
+    assertTrue(created);
+
+    refreshRoutersCaches(routers);
+
+    // Check for the path
+    GetMountTableEntriesRequest getRequest =
+        GetMountTableEntriesRequest.newInstance(mountPoint);
+    GetMountTableEntriesResponse getResponse =
+        mountTable.getMountTableEntries(getRequest);
+    List<MountTable> entries = getResponse.getEntries();
+    assertEquals("Too many entries: " + entries, 1, entries.size());
+    assertEquals(mountPoint, entries.get(0).getSourcePath());
+  }
+
+  /**
+   * Refresh the caches of a set of Routers.
+   * @param routers List of Routers.
+   */
+  public static void refreshRoutersCaches(final List<Router> routers) {
+    for (final Router router : routers) {
+      StateStoreService stateStore = router.getStateStore();
+      stateStore.refreshCaches(true);
+    }
+  }
 }

+ 109 - 0
hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/MockNamenode.java

@@ -17,6 +17,8 @@
  */
 package org.apache.hadoop.hdfs.server.federation;
 
+import static java.util.Collections.emptySet;
+import static java.util.Collections.singletonList;
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_ADMIN;
 import static org.mockito.ArgumentMatchers.any;
 import static org.mockito.ArgumentMatchers.anyBoolean;
@@ -31,14 +33,19 @@ import java.io.IOException;
 import java.net.ConnectException;
 import java.net.URI;
 import java.util.ArrayList;
+import java.util.Collection;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
+import java.util.Map.Entry;
 import java.util.SortedMap;
 import java.util.concurrent.ConcurrentSkipListMap;
 
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.ContentSummary;
 import org.apache.hadoop.fs.FileAlreadyExistsException;
 import org.apache.hadoop.fs.FsServerDefaults;
+import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.permission.FsPermission;
 import org.apache.hadoop.ha.HAServiceProtocol.HAServiceState;
 import org.apache.hadoop.ha.HAServiceStatus;
@@ -67,6 +74,9 @@ import org.apache.hadoop.hdfs.protocolPB.NamenodeProtocolPB;
 import org.apache.hadoop.hdfs.protocolPB.NamenodeProtocolServerSideTranslatorPB;
 import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
 import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor;
+import org.apache.hadoop.hdfs.server.federation.resolver.MembershipNamenodeResolver;
+import org.apache.hadoop.hdfs.server.federation.resolver.NamenodeStatusReport;
+import org.apache.hadoop.hdfs.server.federation.router.Router;
 import org.apache.hadoop.hdfs.server.namenode.LeaseExpiredException;
 import org.apache.hadoop.hdfs.server.namenode.NotReplicatedYetException;
 import org.apache.hadoop.hdfs.server.namenode.SafeModeException;
@@ -311,6 +321,9 @@ public class MockNamenode {
     when(l).thenAnswer(invocation -> {
       String src = getSrc(invocation);
       LOG.info("{} getListing({})", nsId, src);
+      if (fs.get(src) == null) {
+        throw new FileNotFoundException("File does not exist " + src);
+      }
       if (!src.endsWith("/")) {
         src += "/";
       }
@@ -338,6 +351,15 @@ public class MockNamenode {
     when(c).thenAnswer(invocation -> {
       String src = getSrc(invocation);
       LOG.info("{} create({})", nsId, src);
+      boolean createParent = (boolean)invocation.getArgument(4);
+      if (createParent) {
+        Path path = new Path(src).getParent();
+        while (!path.isRoot()) {
+          LOG.info("{} create parent {}", nsId, path);
+          fs.put(path.toString(), "DIRECTORY");
+          path = path.getParent();
+        }
+      }
       fs.put(src, "FILE");
       return getMockHdfsFileStatus(src, "FILE");
     });
@@ -375,6 +397,15 @@ public class MockNamenode {
     when(m).thenAnswer(invocation -> {
       String src = getSrc(invocation);
       LOG.info("{} mkdirs({})", nsId, src);
+      boolean createParent = (boolean)invocation.getArgument(2);
+      if (createParent) {
+        Path path = new Path(src).getParent();
+        while (!path.isRoot()) {
+          LOG.info("{} mkdir parent {}", nsId, path);
+          fs.put(path.toString(), "DIRECTORY");
+          path = path.getParent();
+        }
+      }
       fs.put(src, "DIRECTORY");
       return true;
     });
@@ -386,6 +417,39 @@ public class MockNamenode {
       when(defaults.getKeyProviderUri()).thenReturn(nsId);
       return defaults;
     });
+    when(mockNn.getContentSummary(anyString())).thenAnswer(invocation -> {
+      String src = getSrc(invocation);
+      LOG.info("{} getContentSummary({})", nsId, src);
+      if (fs.get(src) == null) {
+        throw new FileNotFoundException("File does not exist " + src);
+      }
+      if (!src.endsWith("/")) {
+        src += "/";
+      }
+      Map<String, String> files =
+          fs.subMap(src, src + Character.MAX_VALUE);
+      int numFiles = 0;
+      int numDirs = 0;
+      int length = 0;
+      for (Entry<String, String> entry : files.entrySet()) {
+        String file = entry.getKey();
+        if (file.substring(src.length()).indexOf('/') < 0) {
+          String type = entry.getValue();
+          if ("DIRECTORY".equals(type)) {
+            numDirs++;
+          } else if ("FILE".equals(type)) {
+            numFiles++;
+            length += 100;
+          }
+        }
+      }
+      return new ContentSummary.Builder()
+          .fileCount(numFiles)
+          .directoryCount(numDirs)
+          .length(length)
+          .erasureCodingPolicy("")
+          .build();
+    });
   }
 
   private static String getSrc(InvocationOnMock invocation) {
@@ -445,4 +509,49 @@ public class MockNamenode {
     when(lb.getBlockToken()).thenReturn(tok);
     return lb;
   }
+
+  /**
+   * Register a set of NameNodes in a Router.
+   * @param router Router to register to.
+   * @param namenodes Set of NameNodes.
+   * @throws IOException If it cannot register them.
+   */
+  public static void registerSubclusters(Router router,
+      Collection<MockNamenode> namenodes) throws IOException {
+    registerSubclusters(singletonList(router), namenodes, emptySet());
+  }
+
+  /**
+   * Register a set of NameNodes in a set of Routers.
+   * @param routers Set of Routers.
+   * @param namenodes Set of NameNodes.
+   * @param unavailableSubclusters Set of unavailable subclusters.
+   * @throws IOException If it cannot register them.
+   */
+  public static void registerSubclusters(List<Router> routers,
+      Collection<MockNamenode> namenodes,
+      Set<String> unavailableSubclusters) throws IOException {
+
+    for (final Router router : routers) {
+      MembershipNamenodeResolver resolver =
+          (MembershipNamenodeResolver) router.getNamenodeResolver();
+      for (final MockNamenode nn : namenodes) {
+        String nsId = nn.getNameserviceId();
+        String rpcAddress = "localhost:" + nn.getRPCPort();
+        String httpAddress = "localhost:" + nn.getHTTPPort();
+        NamenodeStatusReport report = new NamenodeStatusReport(
+            nsId, null, rpcAddress, rpcAddress, rpcAddress, httpAddress);
+        if (unavailableSubclusters.contains(nsId)) {
+          LOG.info("Register {} as UNAVAILABLE", nsId);
+          report.setRegistrationValid(false);
+        } else {
+          LOG.info("Register {} as ACTIVE", nsId);
+          report.setRegistrationValid(true);
+        }
+        report.setNamespaceInfo(new NamespaceInfo(0, nsId, nsId, 0));
+        resolver.registerNamenode(report);
+      }
+      resolver.loadCache(true);
+    }
+  }
 }

+ 72 - 114
hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterFaultTolerant.java

@@ -18,6 +18,11 @@
 package org.apache.hadoop.hdfs.server.federation.router;
 
 import static java.util.Arrays.asList;
+import static org.apache.hadoop.hdfs.server.federation.FederationTestUtils.createMountTableEntry;
+import static org.apache.hadoop.hdfs.server.federation.FederationTestUtils.getAdminClient;
+import static org.apache.hadoop.hdfs.server.federation.FederationTestUtils.getFileSystem;
+import static org.apache.hadoop.hdfs.server.federation.FederationTestUtils.refreshRoutersCaches;
+import static org.apache.hadoop.hdfs.server.federation.MockNamenode.registerSubclusters;
 import static org.apache.hadoop.hdfs.server.federation.store.FederationStateStoreTestUtils.getStateStoreConfiguration;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNull;
@@ -26,8 +31,8 @@ import static org.junit.Assert.fail;
 
 import java.io.FileNotFoundException;
 import java.io.IOException;
-import java.net.InetSocketAddress;
-import java.net.URI;
+import java.io.PrintWriter;
+import java.io.StringWriter;
 import java.security.PrivilegedExceptionAction;
 import java.util.ArrayList;
 import java.util.Collection;
@@ -36,21 +41,21 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Random;
-import java.util.Set;
 import java.util.UUID;
 import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
 
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.ContentSummary;
 import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hdfs.DFSConfigKeys;
-import org.apache.hadoop.hdfs.DistributedFileSystem;
 import org.apache.hadoop.hdfs.HdfsConfiguration;
 import org.apache.hadoop.hdfs.server.federation.MockNamenode;
 import org.apache.hadoop.hdfs.server.federation.RouterConfigBuilder;
@@ -59,17 +64,12 @@ import org.apache.hadoop.hdfs.server.federation.resolver.FileSubclusterResolver;
 import org.apache.hadoop.hdfs.server.federation.resolver.MembershipNamenodeResolver;
 import org.apache.hadoop.hdfs.server.federation.resolver.MountTableManager;
 import org.apache.hadoop.hdfs.server.federation.resolver.MultipleDestinationMountTableResolver;
-import org.apache.hadoop.hdfs.server.federation.resolver.NamenodeStatusReport;
 import org.apache.hadoop.hdfs.server.federation.resolver.order.DestinationOrder;
-import org.apache.hadoop.hdfs.server.federation.store.StateStoreService;
-import org.apache.hadoop.hdfs.server.federation.store.protocol.AddMountTableEntryRequest;
-import org.apache.hadoop.hdfs.server.federation.store.protocol.AddMountTableEntryResponse;
 import org.apache.hadoop.hdfs.server.federation.store.protocol.GetMountTableEntriesRequest;
 import org.apache.hadoop.hdfs.server.federation.store.protocol.GetMountTableEntriesResponse;
 import org.apache.hadoop.hdfs.server.federation.store.protocol.UpdateMountTableEntryRequest;
 import org.apache.hadoop.hdfs.server.federation.store.protocol.UpdateMountTableEntryResponse;
 import org.apache.hadoop.hdfs.server.federation.store.records.MountTable;
-import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
 import org.apache.hadoop.ipc.RemoteException;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.junit.After;
@@ -150,7 +150,8 @@ public class TestRouterFaultTolerant {
     }
 
     LOG.info("Registering the subclusters in the Routers");
-    registerSubclusters(Collections.singleton("ns1"));
+    registerSubclusters(
+        routers, namenodes.values(), Collections.singleton("ns1"));
 
     LOG.info("Stop ns1 to simulate an unavailable subcluster");
     namenodes.get("ns1").stop();
@@ -158,36 +159,6 @@ public class TestRouterFaultTolerant {
     service = Executors.newFixedThreadPool(10);
   }
 
-  /**
-   * Register the subclusters in all Routers.
-   * @param unavailableSubclusters Set of unavailable subclusters.
-   * @throws IOException If it cannot register a subcluster.
-   */
-  private void registerSubclusters(Set<String> unavailableSubclusters)
-      throws IOException {
-    for (final Router router : routers) {
-      MembershipNamenodeResolver resolver =
-          (MembershipNamenodeResolver) router.getNamenodeResolver();
-      for (final MockNamenode nn : namenodes.values()) {
-        String nsId = nn.getNameserviceId();
-        String rpcAddress = "localhost:" + nn.getRPCPort();
-        String httpAddress = "localhost:" + nn.getHTTPPort();
-        NamenodeStatusReport report = new NamenodeStatusReport(
-            nsId, null, rpcAddress, rpcAddress, rpcAddress, httpAddress);
-        if (unavailableSubclusters.contains(nsId)) {
-          LOG.info("Register {} as UNAVAILABLE", nsId);
-          report.setRegistrationValid(false);
-        } else {
-          LOG.info("Register {} as ACTIVE", nsId);
-          report.setRegistrationValid(true);
-        }
-        report.setNamespaceInfo(new NamespaceInfo(0, nsId, nsId, 0));
-        resolver.registerNamenode(report);
-      }
-      resolver.loadCache(true);
-    }
-  }
-
   @After
   public void cleanup() throws Exception {
     LOG.info("Stopping the cluster");
@@ -205,45 +176,6 @@ public class TestRouterFaultTolerant {
     }
   }
 
-  /**
-   * Add a mount table entry in some name services and wait until it is
-   * available.
-   * @param mountPoint Name of the mount point.
-   * @param order Order of the mount table entry.
-   * @param nsIds Name service identifiers.
-   * @throws Exception If the entry could not be created.
-   */
-  private void createMountTableEntry(
-      final String mountPoint, final DestinationOrder order,
-      Collection<String> nsIds) throws Exception {
-    Router router = getRandomRouter();
-    RouterClient admin = getAdminClient(router);
-    MountTableManager mountTable = admin.getMountTableManager();
-    Map<String, String> destMap = new HashMap<>();
-    for (String nsId : nsIds) {
-      destMap.put(nsId, mountPoint);
-    }
-    MountTable newEntry = MountTable.newInstance(mountPoint, destMap);
-    newEntry.setDestOrder(order);
-    AddMountTableEntryRequest addRequest =
-        AddMountTableEntryRequest.newInstance(newEntry);
-    AddMountTableEntryResponse addResponse =
-        mountTable.addMountTableEntry(addRequest);
-    boolean created = addResponse.getStatus();
-    assertTrue(created);
-
-    refreshRoutersCaches();
-
-    // Check for the path
-    GetMountTableEntriesRequest getRequest =
-        GetMountTableEntriesRequest.newInstance(mountPoint);
-    GetMountTableEntriesResponse getResponse =
-        mountTable.getMountTableEntries(getRequest);
-    List<MountTable> entries = getResponse.getEntries();
-    assertEquals("Too many entries: " + entries, 1, entries.size());
-    assertEquals(mountPoint, entries.get(0).getSourcePath());
-  }
-
   /**
    * Update a mount table entry to be fault tolerant.
    * @param mountPoint Mount point to update.
@@ -266,17 +198,7 @@ public class TestRouterFaultTolerant {
         mountTable.updateMountTableEntry(updateRequest);
     assertTrue(updateResponse.getStatus());
 
-    refreshRoutersCaches();
-  }
-
-  /**
-   * Refresh the caches of all Routers (to get the mount table).
-   */
-  private void refreshRoutersCaches() {
-    for (final Router router : routers) {
-      StateStoreService stateStore = router.getStateStore();
-      stateStore.refreshCaches(true);
-    }
+    refreshRoutersCaches(routers);
   }
 
   /**
@@ -320,8 +242,8 @@ public class TestRouterFaultTolerant {
     final String mountPoint = "/" + order + "-failsubcluster";
     final Path mountPath = new Path(mountPoint);
     LOG.info("Setup {} with order {}", mountPoint, order);
-    createMountTableEntry(mountPoint, order, namenodes.keySet());
-
+    createMountTableEntry(
+        getRandomRouter(), mountPoint, order, namenodes.keySet());
 
     LOG.info("Write in {} should succeed writing in ns0 and fail for ns1",
         mountPath);
@@ -383,7 +305,14 @@ public class TestRouterFaultTolerant {
     tasks.add(getListFailTask(router0Fs, mountPoint));
     int filesExpected = dirs0.length + results.getSuccess();
     tasks.add(getListSuccessTask(router1Fs, mountPoint, filesExpected));
-    assertEquals(2, collectResults("List "  + mountPoint, tasks).getSuccess());
+    results = collectResults("List "  + mountPoint, tasks);
+    assertEquals("Failed listing", 2, results.getSuccess());
+
+    tasks.add(getContentSummaryFailTask(router0Fs, mountPoint));
+    tasks.add(getContentSummarySuccessTask(
+        router1Fs, mountPoint, filesExpected));
+    results = collectResults("Content summary "  + mountPoint, tasks);
+    assertEquals("Failed content summary", 2, results.getSuccess());
   }
 
   /**
@@ -422,6 +351,12 @@ public class TestRouterFaultTolerant {
     tasks.add(getListFailTask(router0Fs, dir0));
     tasks.add(getListSuccessTask(router1Fs, dir0, results.getSuccess()));
     assertEquals(2, collectResults("List "  + dir0, tasks).getSuccess());
+
+    tasks.add(getContentSummaryFailTask(router0Fs, dir0));
+    tasks.add(getContentSummarySuccessTask(
+        router1Fs, dir0, results.getSuccess()));
+    results = collectResults("Content summary "  + dir0, tasks);
+    assertEquals(2, results.getSuccess());
   }
 
   /**
@@ -534,6 +469,42 @@ public class TestRouterFaultTolerant {
     };
   }
 
+
+  /**
+   * Task that lists a directory and expects to fail.
+   * @param fs File system to check.
+   * @param path Path to try to list.
+   * @return If the listing failed as expected.
+   */
+  private static Callable<Boolean> getContentSummaryFailTask(
+      FileSystem fs, Path path) {
+    return () -> {
+      try {
+        fs.getContentSummary(path);
+        return false;
+      } catch (RemoteException re) {
+        return true;
+      }
+    };
+  }
+
+  /**
+   * Task that lists a directory and succeeds.
+   * @param fs File system to check.
+   * @param path Path to list.
+   * @param expected Number of files to expect to find.
+   * @return If the listing succeeds.
+   */
+  private static Callable<Boolean> getContentSummarySuccessTask(
+      FileSystem fs, Path path, int expected) {
+    return () -> {
+      ContentSummary summary = fs.getContentSummary(path);
+      assertEquals("Wrong summary for " + path,
+          expected, summary.getFileAndDirectoryCount());
+      return true;
+    };
+  }
+
   /**
    * Invoke a set of tasks and collect their outputs.
    * The tasks should do assertions.
@@ -556,7 +527,14 @@ public class TestRouterFaultTolerant {
           results.incrFailure();
         }
       } catch (Exception e) {
-        fail(e.getMessage());
+        StringWriter stackTrace = new StringWriter();
+        PrintWriter writer = new PrintWriter(stackTrace);
+        if (e instanceof ExecutionException) {
+          e.getCause().printStackTrace(writer);
+        } else {
+          e.printStackTrace(writer);
+        }
+        fail("Failed to run \"" + tag + "\": " + stackTrace);
       }
     });
     tasks.clear();
@@ -631,24 +609,4 @@ public class TestRouterFaultTolerant {
     return userUgi.doAs(
         (PrivilegedExceptionAction<FileSystem>) () -> getFileSystem(router));
   }
-
-  private static FileSystem getFileSystem(int rpcPort) throws IOException {
-    Configuration conf = new HdfsConfiguration();
-    URI uri = URI.create("hdfs://localhost:" + rpcPort);
-    return DistributedFileSystem.get(uri, conf);
-  }
-
-  private static FileSystem getFileSystem(final Router router)
-      throws IOException {
-    InetSocketAddress rpcAddress = router.getRpcServerAddress();
-    int rpcPort = rpcAddress.getPort();
-    return getFileSystem(rpcPort);
-  }
-
-  private static RouterClient getAdminClient(
-      final Router router) throws IOException {
-    Configuration conf = new HdfsConfiguration();
-    InetSocketAddress routerSocket = router.getAdminServerAddress();
-    return new RouterClient(routerSocket, conf);
-  }
 }

+ 182 - 0
hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterMissingFolderMulti.java

@@ -0,0 +1,182 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.federation.router;
+
+import static java.util.Arrays.asList;
+import static org.apache.hadoop.hdfs.server.federation.FederationTestUtils.createMountTableEntry;
+import static org.apache.hadoop.hdfs.server.federation.FederationTestUtils.getFileSystem;
+import static org.apache.hadoop.hdfs.server.federation.MockNamenode.registerSubclusters;
+import static org.apache.hadoop.hdfs.server.federation.store.FederationStateStoreTestUtils.getStateStoreConfiguration;
+import static org.junit.Assert.assertEquals;
+
+import java.io.FileNotFoundException;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.ContentSummary;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.DFSConfigKeys;
+import org.apache.hadoop.hdfs.HdfsConfiguration;
+import org.apache.hadoop.hdfs.server.federation.MockNamenode;
+import org.apache.hadoop.hdfs.server.federation.RouterConfigBuilder;
+import org.apache.hadoop.hdfs.server.federation.resolver.ActiveNamenodeResolver;
+import org.apache.hadoop.hdfs.server.federation.resolver.FileSubclusterResolver;
+import org.apache.hadoop.hdfs.server.federation.resolver.MembershipNamenodeResolver;
+import org.apache.hadoop.hdfs.server.federation.resolver.MultipleDestinationMountTableResolver;
+import org.apache.hadoop.hdfs.server.federation.resolver.order.DestinationOrder;
+import org.apache.hadoop.test.LambdaTestUtils;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * Test the behavior when listing a mount point mapped to multiple subclusters
+ * and one of the subclusters is missing it.
+ */
+public class TestRouterMissingFolderMulti {
+
+  private static final Logger LOG =
+      LoggerFactory.getLogger(TestRouterMissingFolderMulti.class);
+
+  /** Number of files to create for testing. */
+  private static final int NUM_FILES = 10;
+
+  /** Namenodes for the test per name service id (subcluster). */
+  private Map<String, MockNamenode> namenodes = new HashMap<>();
+  /** Routers for the test. */
+  private Router router;
+
+
+  @Before
+  public void setup() throws Exception {
+    LOG.info("Start the Namenodes");
+    Configuration nnConf = new HdfsConfiguration();
+    nnConf.setInt(DFSConfigKeys.DFS_NAMENODE_HANDLER_COUNT_KEY, 10);
+    for (final String nsId : asList("ns0", "ns1")) {
+      MockNamenode nn = new MockNamenode(nsId, nnConf);
+      nn.transitionToActive();
+      nn.addFileSystemMock();
+      namenodes.put(nsId, nn);
+    }
+
+    LOG.info("Start the Routers");
+    Configuration routerConf = new RouterConfigBuilder()
+        .stateStore()
+        .admin()
+        .rpc()
+        .build();
+    routerConf.set(RBFConfigKeys.DFS_ROUTER_RPC_ADDRESS_KEY, "0.0.0.0:0");
+    routerConf.set(RBFConfigKeys.DFS_ROUTER_HTTP_ADDRESS_KEY, "0.0.0.0:0");
+    routerConf.set(RBFConfigKeys.DFS_ROUTER_ADMIN_ADDRESS_KEY, "0.0.0.0:0");
+
+    Configuration stateStoreConf = getStateStoreConfiguration();
+    stateStoreConf.setClass(
+        RBFConfigKeys.FEDERATION_NAMENODE_RESOLVER_CLIENT_CLASS,
+        MembershipNamenodeResolver.class, ActiveNamenodeResolver.class);
+    stateStoreConf.setClass(
+        RBFConfigKeys.FEDERATION_FILE_RESOLVER_CLIENT_CLASS,
+        MultipleDestinationMountTableResolver.class,
+        FileSubclusterResolver.class);
+    routerConf.addResource(stateStoreConf);
+
+    routerConf.setBoolean(RBFConfigKeys.DFS_ROUTER_ALLOW_PARTIAL_LIST, false);
+
+    router = new Router();
+    router.init(routerConf);
+    router.start();
+
+    LOG.info("Registering the subclusters in the Routers");
+    registerSubclusters(router, namenodes.values());
+  }
+
+  @After
+  public void cleanup() throws Exception {
+    LOG.info("Stopping the cluster");
+    for (final MockNamenode nn : namenodes.values()) {
+      nn.stop();
+    }
+    namenodes.clear();
+
+    if (router != null) {
+      router.stop();
+      router = null;
+    }
+  }
+
+  @Test
+  public void testSuccess() throws Exception {
+    FileSystem fs = getFileSystem(router);
+    String mountPoint = "/test-success";
+    createMountTableEntry(router, mountPoint,
+        DestinationOrder.HASH_ALL, namenodes.keySet());
+    Path folder = new Path(mountPoint, "folder-all");
+    for (int i = 0; i < NUM_FILES; i++) {
+      Path file = new Path(folder, "file-" + i + ".txt");
+      FSDataOutputStream os = fs.create(file);
+      os.close();
+    }
+    FileStatus[] files = fs.listStatus(folder);
+    assertEquals(NUM_FILES, files.length);
+    ContentSummary contentSummary = fs.getContentSummary(folder);
+    assertEquals(NUM_FILES, contentSummary.getFileCount());
+  }
+
+  @Test
+  public void testFileNotFound() throws Exception {
+    FileSystem fs = getFileSystem(router);
+    String mountPoint = "/test-non-existing";
+    createMountTableEntry(router,
+        mountPoint, DestinationOrder.HASH_ALL, namenodes.keySet());
+    Path path = new Path(mountPoint, "folder-all");
+    LambdaTestUtils.intercept(FileNotFoundException.class,
+        () -> fs.listStatus(path));
+    LambdaTestUtils.intercept(FileNotFoundException.class,
+        () -> fs.getContentSummary(path));
+  }
+
+  @Test
+  public void testOneMissing() throws Exception  {
+    FileSystem fs = getFileSystem(router);
+    String mountPoint = "/test-one-missing";
+    createMountTableEntry(router, mountPoint,
+        DestinationOrder.HASH_ALL, namenodes.keySet());
+
+    // Create the folders directly in only one of the Namenodes
+    MockNamenode nn = namenodes.get("ns0");
+    int nnRpcPort = nn.getRPCPort();
+    FileSystem nnFs = getFileSystem(nnRpcPort);
+    Path folder = new Path(mountPoint, "folder-all");
+    for (int i = 0; i < NUM_FILES; i++) {
+      Path file = new Path(folder, "file-" + i + ".txt");
+      FSDataOutputStream os = nnFs.create(file);
+      os.close();
+    }
+
+    FileStatus[] files = fs.listStatus(folder);
+    assertEquals(NUM_FILES, files.length);
+    ContentSummary summary = fs.getContentSummary(folder);
+    assertEquals(NUM_FILES, summary.getFileAndDirectoryCount());
+  }
+}