Browse Source

HADOOP-18193:Support nested mount points in INodeTree

Fixes #4181

Signed-off-by: Owen O'Malley <oomalley@linkedin.com>
Lei Yang 3 years ago
parent
commit
6a95c3a039

+ 18 - 0
hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/viewfs/ConfigUtil.java

@@ -247,4 +247,22 @@ public class ConfigUtil {
     return conf.get(Constants.CONFIG_VIEWFS_DEFAULT_MOUNT_TABLE_NAME_KEY,
     return conf.get(Constants.CONFIG_VIEWFS_DEFAULT_MOUNT_TABLE_NAME_KEY,
         Constants.CONFIG_VIEWFS_DEFAULT_MOUNT_TABLE);
         Constants.CONFIG_VIEWFS_DEFAULT_MOUNT_TABLE);
   }
   }
+
+  /**
+   * Check the bool config whether nested mount point is supported. Default: true
+   * @param conf - from this conf
+   * @return whether nested mount point is supported
+   */
+  public static boolean isNestedMountPointSupported(final Configuration conf) {
+    return conf.getBoolean(Constants.CONFIG_NESTED_MOUNT_POINT_SUPPORTED, true);
+  }
+
+  /**
+   * Set the bool value isNestedMountPointSupported in config.
+   * @param conf - from this conf
+   * @param isNestedMountPointSupported - whether nested mount point is supported
+   */
+  public static void setIsNestedMountPointSupported(final Configuration conf, boolean isNestedMountPointSupported) {
+    conf.setBoolean(Constants.CONFIG_NESTED_MOUNT_POINT_SUPPORTED, isNestedMountPointSupported);
+  }
 }
 }

+ 9 - 4
hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/viewfs/Constants.java

@@ -35,7 +35,7 @@ public interface Constants {
    * Prefix for the config variable for the ViewFs mount-table path.
    * Prefix for the config variable for the ViewFs mount-table path.
    */
    */
   String CONFIG_VIEWFS_MOUNTTABLE_PATH = CONFIG_VIEWFS_PREFIX + ".path";
   String CONFIG_VIEWFS_MOUNTTABLE_PATH = CONFIG_VIEWFS_PREFIX + ".path";
- 
+
   /**
   /**
    * Prefix for the home dir for the mount table - if not specified
    * Prefix for the home dir for the mount table - if not specified
    * then the hadoop default value (/user) is used.
    * then the hadoop default value (/user) is used.
@@ -53,12 +53,17 @@ public interface Constants {
    */
    */
   public static final String CONFIG_VIEWFS_DEFAULT_MOUNT_TABLE = "default";
   public static final String CONFIG_VIEWFS_DEFAULT_MOUNT_TABLE = "default";
 
 
+  /**
+   * Config to enable nested mount point in viewfs
+   */
+  String CONFIG_NESTED_MOUNT_POINT_SUPPORTED = CONFIG_VIEWFS_PREFIX + ".nested.mount.point.supported";
+
   /**
   /**
    * Config variable full prefix for the default mount table.
    * Config variable full prefix for the default mount table.
    */
    */
-  public static final String CONFIG_VIEWFS_PREFIX_DEFAULT_MOUNT_TABLE = 
+  public static final String CONFIG_VIEWFS_PREFIX_DEFAULT_MOUNT_TABLE =
           CONFIG_VIEWFS_PREFIX + "." + CONFIG_VIEWFS_DEFAULT_MOUNT_TABLE;
           CONFIG_VIEWFS_PREFIX + "." + CONFIG_VIEWFS_DEFAULT_MOUNT_TABLE;
-  
+
   /**
   /**
    * Config variable for specifying a simple link
    * Config variable for specifying a simple link
    */
    */
@@ -82,7 +87,7 @@ public interface Constants {
 
 
   /**
   /**
    * Config variable for specifying a merge of the root of the mount-table
    * Config variable for specifying a merge of the root of the mount-table
-   *  with the root of another file system. 
+   *  with the root of another file system.
    */
    */
   String CONFIG_VIEWFS_LINK_MERGE_SLASH = "linkMergeSlash";
   String CONFIG_VIEWFS_LINK_MERGE_SLASH = "linkMergeSlash";
 
 

+ 147 - 33
hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/viewfs/InodeTree.java

@@ -17,6 +17,10 @@
  */
  */
 package org.apache.hadoop.fs.viewfs;
 package org.apache.hadoop.fs.viewfs;
 
 
+import java.util.Collection;
+import java.util.Comparator;
+import java.util.Set;
+import java.util.TreeSet;
 import java.util.function.Function;
 import java.util.function.Function;
 import org.apache.hadoop.util.Preconditions;
 import org.apache.hadoop.util.Preconditions;
 import java.io.FileNotFoundException;
 import java.io.FileNotFoundException;
@@ -81,6 +85,8 @@ public abstract class InodeTree<T> {
   private List<RegexMountPoint<T>> regexMountPointList =
   private List<RegexMountPoint<T>> regexMountPointList =
       new ArrayList<RegexMountPoint<T>>();
       new ArrayList<RegexMountPoint<T>>();
 
 
+  private final boolean isNestedMountPointSupported;
+
   public static class MountPoint<T> {
   public static class MountPoint<T> {
     String src;
     String src;
     INodeLink<T> target;
     INodeLink<T> target;
@@ -99,7 +105,7 @@ public abstract class InodeTree<T> {
     }
     }
 
 
     /**
     /**
-     * Returns the target link.
+     * Returns the target INode link.
      * @return The target INode link
      * @return The target INode link
      */
      */
     public INodeLink<T> getTarget() {
     public INodeLink<T> getTarget() {
@@ -138,6 +144,14 @@ public abstract class InodeTree<T> {
     boolean isLink() {
     boolean isLink() {
       return !isInternalDir();
       return !isInternalDir();
     }
     }
+
+    /**
+     * Return the link if isLink.
+     * @return will return null, for non links.
+     */
+    INodeLink<T> getLink() {
+      return null;
+    }
   }
   }
 
 
   /**
   /**
@@ -212,6 +226,51 @@ public abstract class InodeTree<T> {
       }
       }
       children.put(pathComponent, link);
       children.put(pathComponent, link);
     }
     }
+
+    void addDirLink(final String pathComponent, final INodeDirLink<T> dirLink) {
+      children.put(pathComponent, dirLink);
+    }
+  }
+
+  /**
+   * Internal class to represent an INodeDir which also contains a INodeLink. This is used to support nested mount points
+   * where an INode is internalDir but points to a mount link. The class is a subclass of INodeDir and the semantics are
+   * as follows:
+   * isLink(): true
+   * isInternalDir(): true
+   * @param <T>
+   */
+  static class INodeDirLink<T> extends INodeDir<T> {
+    /**
+     * INodeLink wrapped in the INodeDir
+     */
+    private final INodeLink<T> link;
+
+    INodeDirLink(String pathToNode, UserGroupInformation aUgi, INodeLink<T> link) {
+      super(pathToNode, aUgi);
+      this.link = link;
+    }
+
+    @Override
+    INodeLink<T> getLink() {
+      return link;
+    }
+
+    /**
+     * True because the INodeDirLink also contains a INodeLink
+     */
+    @Override
+    boolean isLink() {
+      return true;
+    }
+
+    /**
+     * True because the INodeDirLink is internal node
+     */
+    @Override
+    boolean isInternalDir() {
+      return true;
+    }
   }
   }
 
 
   /**
   /**
@@ -320,6 +379,11 @@ public abstract class InodeTree<T> {
       return false;
       return false;
     }
     }
 
 
+    @Override
+    INodeLink<T> getLink() {
+      return this;
+    }
+
     /**
     /**
      * Get the instance of FileSystem to use, creating one if needed.
      * Get the instance of FileSystem to use, creating one if needed.
      * @return An Initialized instance of T
      * @return An Initialized instance of T
@@ -376,10 +440,17 @@ public abstract class InodeTree<T> {
         newDir.setInternalDirFs(getTargetFileSystem(newDir));
         newDir.setInternalDirFs(getTargetFileSystem(newDir));
         nextInode = newDir;
         nextInode = newDir;
       }
       }
-      if (nextInode.isLink()) {
-        // Error - expected a dir but got a link
-        throw new FileAlreadyExistsException("Path " + nextInode.fullPath +
-            " already exists as link");
+      if (!nextInode.isInternalDir()) {
+        if (isNestedMountPointSupported) {
+          // nested mount detected, add a new INodeDirLink that wraps existing INodeLink to INodeTree and override existing INodelink
+          INodeDirLink<T> dirLink = new INodeDirLink<T>(nextInode.fullPath, aUgi, (INodeLink<T>) nextInode);
+          curInode.addDirLink(iPath, dirLink);
+          curInode = dirLink;
+        } else {
+          // Error - expected a dir but got a link
+          throw new FileAlreadyExistsException("Path " + nextInode.fullPath +
+              " already exists as link");
+        }
       } else {
       } else {
         assert(nextInode.isInternalDir());
         assert(nextInode.isInternalDir());
         curInode = (INodeDir<T>) nextInode;
         curInode = (INodeDir<T>) nextInode;
@@ -445,7 +516,7 @@ public abstract class InodeTree<T> {
   }
   }
 
 
   private INodeLink<T> getRootLink() {
   private INodeLink<T> getRootLink() {
-    Preconditions.checkState(root.isLink());
+    Preconditions.checkState(!root.isInternalDir());
     return (INodeLink<T>)root;
     return (INodeLink<T>)root;
   }
   }
 
 
@@ -538,6 +609,7 @@ public abstract class InodeTree<T> {
       mountTableName = ConfigUtil.getDefaultMountTableName(config);
       mountTableName = ConfigUtil.getDefaultMountTableName(config);
     }
     }
     homedirPrefix = ConfigUtil.getHomeDirValue(config, mountTableName);
     homedirPrefix = ConfigUtil.getHomeDirValue(config, mountTableName);
+    isNestedMountPointSupported = ConfigUtil.isNestedMountPointSupported(config);
 
 
     boolean isMergeSlashConfigured = false;
     boolean isMergeSlashConfigured = false;
     String mergeSlashTarget = null;
     String mergeSlashTarget = null;
@@ -642,7 +714,8 @@ public abstract class InodeTree<T> {
       getRootDir().setInternalDirFs(getTargetFileSystem(getRootDir()));
       getRootDir().setInternalDirFs(getTargetFileSystem(getRootDir()));
       getRootDir().setRoot(true);
       getRootDir().setRoot(true);
       INodeLink<T> fallbackLink = null;
       INodeLink<T> fallbackLink = null;
-      for (LinkEntry le : linkEntries) {
+
+      for (LinkEntry le : getLinkEntries(linkEntries)) {
         switch (le.getLinkType()) {
         switch (le.getLinkType()) {
         case SINGLE_FALLBACK:
         case SINGLE_FALLBACK:
           if (fallbackLink != null) {
           if (fallbackLink != null) {
@@ -682,6 +755,32 @@ public abstract class InodeTree<T> {
     }
     }
   }
   }
 
 
+  /**
+   * Get collection of linkEntry. Sort mount point based on alphabetical order of the src paths.
+   * The purpose is to group nested paths(shortest path always comes first) during INodeTree creation.
+   * E.g. /foo is nested with /foo/bar so an INodeDirLink will be created at /foo.
+   * @param linkEntries input linkEntries
+   * @return sorted linkEntries
+   */
+  private Collection<LinkEntry> getLinkEntries(List<LinkEntry> linkEntries) {
+    Set<LinkEntry> sortedLinkEntries = new TreeSet<>(new Comparator<LinkEntry>() {
+      @Override
+      public int compare(LinkEntry o1, LinkEntry o2) {
+        if (o1 == null) {
+          return -1;
+        }
+        if (o2 == null) {
+          return 1;
+        }
+        String src1 = o1.getSrc();
+        String src2=  o2.getSrc();
+        return src1.compareTo(src2);
+      }
+    });
+    sortedLinkEntries.addAll(linkEntries);
+    return sortedLinkEntries;
+  }
+
   private void checkMntEntryKeyEqualsTarget(
   private void checkMntEntryKeyEqualsTarget(
       String mntEntryKey, String targetMntEntryKey) throws IOException {
       String mntEntryKey, String targetMntEntryKey) throws IOException {
     if (!mntEntryKey.equals(targetMntEntryKey)) {
     if (!mntEntryKey.equals(targetMntEntryKey)) {
@@ -795,7 +894,7 @@ public abstract class InodeTree<T> {
      * been linked to the root directory of a file system.
      * been linked to the root directory of a file system.
      * The first non-slash path component should be name of the mount table.
      * The first non-slash path component should be name of the mount table.
      */
      */
-    if (root.isLink()) {
+    if (!root.isInternalDir()) {
       Path remainingPath;
       Path remainingPath;
       StringBuilder remainingPathStr = new StringBuilder();
       StringBuilder remainingPathStr = new StringBuilder();
       // ignore first slash
       // ignore first slash
@@ -818,10 +917,17 @@ public abstract class InodeTree<T> {
     }
     }
 
 
     int i;
     int i;
+    INodeDirLink<T> lastResolvedDirLink = null;
+    int lastResolvedDirLinkIndex = -1;
     // ignore first slash
     // ignore first slash
     for (i = 1; i < path.length - (resolveLastComponent ? 0 : 1); i++) {
     for (i = 1; i < path.length - (resolveLastComponent ? 0 : 1); i++) {
       INode<T> nextInode = curInode.resolveInternal(path[i]);
       INode<T> nextInode = curInode.resolveInternal(path[i]);
       if (nextInode == null) {
       if (nextInode == null) {
+        // first resolve to dirlink for nested mount point
+        if (isNestedMountPointSupported && lastResolvedDirLink != null) {
+          return new ResolveResult<T>(ResultKind.EXTERNAL_DIR, lastResolvedDirLink.getLink().getTargetFileSystem(),
+              lastResolvedDirLink.fullPath, getRemainingPath(path, i),true);
+        }
         if (hasFallbackLink()) {
         if (hasFallbackLink()) {
           resolveResult = new ResolveResult<T>(ResultKind.EXTERNAL_DIR,
           resolveResult = new ResolveResult<T>(ResultKind.EXTERNAL_DIR,
               getRootFallbackLink().getTargetFileSystem(), root.fullPath,
               getRootFallbackLink().getTargetFileSystem(), root.fullPath,
@@ -837,46 +943,54 @@ public abstract class InodeTree<T> {
         }
         }
       }
       }
 
 
-      if (nextInode.isLink()) {
+      if (!nextInode.isInternalDir()) {
         final INodeLink<T> link = (INodeLink<T>) nextInode;
         final INodeLink<T> link = (INodeLink<T>) nextInode;
-        final Path remainingPath;
-        if (i >= path.length - 1) {
-          remainingPath = SlashPath;
-        } else {
-          StringBuilder remainingPathStr =
-              new StringBuilder("/" + path[i + 1]);
-          for (int j = i + 2; j < path.length; ++j) {
-            remainingPathStr.append('/').append(path[j]);
-          }
-          remainingPath = new Path(remainingPathStr.toString());
-        }
+        final Path remainingPath = getRemainingPath(path, i + 1);
         resolveResult = new ResolveResult<T>(ResultKind.EXTERNAL_DIR,
         resolveResult = new ResolveResult<T>(ResultKind.EXTERNAL_DIR,
             link.getTargetFileSystem(), nextInode.fullPath, remainingPath,
             link.getTargetFileSystem(), nextInode.fullPath, remainingPath,
             true);
             true);
         return resolveResult;
         return resolveResult;
-      } else if (nextInode.isInternalDir()) {
+      } else {
         curInode = (INodeDir<T>) nextInode;
         curInode = (INodeDir<T>) nextInode;
+        // track last resolved nest mount point.
+        if (isNestedMountPointSupported && nextInode.isLink()) {
+          lastResolvedDirLink = (INodeDirLink<T>) nextInode;
+          lastResolvedDirLinkIndex = i;
+        }
       }
       }
     }
     }
 
 
-    // We have resolved to an internal dir in mount table.
     Path remainingPath;
     Path remainingPath;
-    if (resolveLastComponent) {
+    if (isNestedMountPointSupported && lastResolvedDirLink != null) {
+      remainingPath = getRemainingPath(path, lastResolvedDirLinkIndex + 1);
+      resolveResult = new ResolveResult<T>(ResultKind.EXTERNAL_DIR, lastResolvedDirLink.getLink().getTargetFileSystem(),
+          lastResolvedDirLink.fullPath, remainingPath,true);
+    } else {
+      remainingPath = resolveLastComponent ? SlashPath : getRemainingPath(path, i);
+      resolveResult = new ResolveResult<T>(ResultKind.INTERNAL_DIR, curInode.getInternalDirFs(),
+          curInode.fullPath, remainingPath, false);
+    }
+    return resolveResult;
+  }
+
+  /**
+   * Return remaining path from specified index to the end of the path array.
+   * @param path An array of path components split by slash
+   * @param startIndex the specified start index of the path array
+   * @return remaining path.
+   */
+  private Path getRemainingPath(String[] path, int startIndex) {
+    Path remainingPath;
+    if (startIndex >= path.length) {
       remainingPath = SlashPath;
       remainingPath = SlashPath;
     } else {
     } else {
-      // note we have taken care of when path is "/" above
-      // for internal dirs rem-path does not start with / since the lookup
-      // that follows will do a children.get(remaningPath) and will have to
-      // strip-out the initial /
-      StringBuilder remainingPathStr = new StringBuilder("/" + path[i]);
-      for (int j = i + 1; j < path.length; ++j) {
-        remainingPathStr.append('/').append(path[j]);
+      StringBuilder remainingPathStr = new StringBuilder();
+      for (int j = startIndex; j < path.length; j++) {
+        remainingPathStr.append("/").append(path[j]);
       }
       }
       remainingPath = new Path(remainingPathStr.toString());
       remainingPath = new Path(remainingPathStr.toString());
     }
     }
-    resolveResult = new ResolveResult<T>(ResultKind.INTERNAL_DIR,
-        curInode.getInternalDirFs(), curInode.fullPath, remainingPath, false);
-    return resolveResult;
+    return remainingPath;
   }
   }
 
 
   /**
   /**

+ 53 - 53
hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/viewfs/ViewFileSystem.java

@@ -253,9 +253,9 @@ public class ViewFileSystem extends FileSystem {
   private RenameStrategy renameStrategy = RenameStrategy.SAME_MOUNTPOINT;
   private RenameStrategy renameStrategy = RenameStrategy.SAME_MOUNTPOINT;
   /**
   /**
    * Make the path Absolute and get the path-part of a pathname.
    * Make the path Absolute and get the path-part of a pathname.
-   * Checks that URI matches this file system 
+   * Checks that URI matches this file system
    * and that the path-part is a valid name.
    * and that the path-part is a valid name.
-   * 
+   *
    * @param p path
    * @param p path
    * @return path-part of the Path p
    * @return path-part of the Path p
    */
    */
@@ -263,17 +263,17 @@ public class ViewFileSystem extends FileSystem {
     checkPath(p);
     checkPath(p);
     return makeAbsolute(p).toUri().getPath();
     return makeAbsolute(p).toUri().getPath();
   }
   }
-  
+
   private Path makeAbsolute(final Path f) {
   private Path makeAbsolute(final Path f) {
     return f.isAbsolute() ? f : new Path(workingDir, f);
     return f.isAbsolute() ? f : new Path(workingDir, f);
   }
   }
-  
+
   /**
   /**
    * This is the  constructor with the signature needed by
    * This is the  constructor with the signature needed by
    * {@link FileSystem#createFileSystem(URI, Configuration)}
    * {@link FileSystem#createFileSystem(URI, Configuration)}
-   * 
+   *
    * After this constructor is called initialize() is called.
    * After this constructor is called initialize() is called.
-   * @throws IOException 
+   * @throws IOException
    */
    */
   public ViewFileSystem() throws IOException {
   public ViewFileSystem() throws IOException {
     ugi = UserGroupInformation.getCurrentUser();
     ugi = UserGroupInformation.getCurrentUser();
@@ -392,7 +392,7 @@ public class ViewFileSystem extends FileSystem {
     this();
     this();
     initialize(theUri, conf);
     initialize(theUri, conf);
   }
   }
-  
+
   /**
   /**
    * Convenience Constructor for apps to call directly
    * Convenience Constructor for apps to call directly
    * @param conf
    * @param conf
@@ -401,12 +401,12 @@ public class ViewFileSystem extends FileSystem {
   public ViewFileSystem(final Configuration conf) throws IOException {
   public ViewFileSystem(final Configuration conf) throws IOException {
     this(FsConstants.VIEWFS_URI, conf);
     this(FsConstants.VIEWFS_URI, conf);
   }
   }
-  
+
   @Override
   @Override
   public URI getUri() {
   public URI getUri() {
     return myUri;
     return myUri;
   }
   }
-  
+
   @Override
   @Override
   public Path resolvePath(final Path f) throws IOException {
   public Path resolvePath(final Path f) throws IOException {
     final InodeTree.ResolveResult<FileSystem> res;
     final InodeTree.ResolveResult<FileSystem> res;
@@ -416,7 +416,7 @@ public class ViewFileSystem extends FileSystem {
     }
     }
     return res.targetFileSystem.resolvePath(res.remainingPath);
     return res.targetFileSystem.resolvePath(res.remainingPath);
   }
   }
-  
+
   @Override
   @Override
   public Path getHomeDirectory() {
   public Path getHomeDirectory() {
     if (homeDir == null) {
     if (homeDir == null) {
@@ -424,13 +424,13 @@ public class ViewFileSystem extends FileSystem {
       if (base == null) {
       if (base == null) {
         base = "/user";
         base = "/user";
       }
       }
-      homeDir = (base.equals("/") ? 
+      homeDir = (base.equals("/") ?
           this.makeQualified(new Path(base + ugi.getShortUserName())):
           this.makeQualified(new Path(base + ugi.getShortUserName())):
           this.makeQualified(new Path(base + "/" + ugi.getShortUserName())));
           this.makeQualified(new Path(base + "/" + ugi.getShortUserName())));
     }
     }
     return homeDir;
     return homeDir;
   }
   }
-  
+
   @Override
   @Override
   public Path getWorkingDirectory() {
   public Path getWorkingDirectory() {
     return workingDir;
     return workingDir;
@@ -441,11 +441,11 @@ public class ViewFileSystem extends FileSystem {
     getUriPath(new_dir); // this validates the path
     getUriPath(new_dir); // this validates the path
     workingDir = makeAbsolute(new_dir);
     workingDir = makeAbsolute(new_dir);
   }
   }
-  
+
   @Override
   @Override
   public FSDataOutputStream append(final Path f, final int bufferSize,
   public FSDataOutputStream append(final Path f, final int bufferSize,
       final Progressable progress) throws IOException {
       final Progressable progress) throws IOException {
-    InodeTree.ResolveResult<FileSystem> res = 
+    InodeTree.ResolveResult<FileSystem> res =
       fsState.resolve(getUriPath(f), true);
       fsState.resolve(getUriPath(f), true);
     return res.targetFileSystem.append(res.remainingPath, bufferSize, progress);
     return res.targetFileSystem.append(res.remainingPath, bufferSize, progress);
   }
   }
@@ -464,7 +464,7 @@ public class ViewFileSystem extends FileSystem {
     return res.targetFileSystem.createNonRecursive(res.remainingPath,
     return res.targetFileSystem.createNonRecursive(res.remainingPath,
         permission, flags, bufferSize, replication, blockSize, progress);
         permission, flags, bufferSize, replication, blockSize, progress);
   }
   }
-  
+
   @Override
   @Override
   public FSDataOutputStream create(final Path f, final FsPermission permission,
   public FSDataOutputStream create(final Path f, final FsPermission permission,
       final boolean overwrite, final int bufferSize, final short replication,
       final boolean overwrite, final int bufferSize, final short replication,
@@ -480,11 +480,11 @@ public class ViewFileSystem extends FileSystem {
         overwrite, bufferSize, replication, blockSize, progress);
         overwrite, bufferSize, replication, blockSize, progress);
   }
   }
 
 
-  
+
   @Override
   @Override
   public boolean delete(final Path f, final boolean recursive)
   public boolean delete(final Path f, final boolean recursive)
       throws AccessControlException, FileNotFoundException, IOException {
       throws AccessControlException, FileNotFoundException, IOException {
-    InodeTree.ResolveResult<FileSystem> res = 
+    InodeTree.ResolveResult<FileSystem> res =
         fsState.resolve(getUriPath(f), true);
         fsState.resolve(getUriPath(f), true);
     // If internal dir or target is a mount link (ie remainingPath is Slash)
     // If internal dir or target is a mount link (ie remainingPath is Slash)
     if (res.isInternalDir() || res.remainingPath == InodeTree.SlashPath) {
     if (res.isInternalDir() || res.remainingPath == InodeTree.SlashPath) {
@@ -492,18 +492,18 @@ public class ViewFileSystem extends FileSystem {
     }
     }
     return res.targetFileSystem.delete(res.remainingPath, recursive);
     return res.targetFileSystem.delete(res.remainingPath, recursive);
   }
   }
-  
+
   @Override
   @Override
   @SuppressWarnings("deprecation")
   @SuppressWarnings("deprecation")
   public boolean delete(final Path f)
   public boolean delete(final Path f)
       throws AccessControlException, FileNotFoundException, IOException {
       throws AccessControlException, FileNotFoundException, IOException {
     return delete(f, true);
     return delete(f, true);
   }
   }
-  
+
   @Override
   @Override
-  public BlockLocation[] getFileBlockLocations(FileStatus fs, 
+  public BlockLocation[] getFileBlockLocations(FileStatus fs,
       long start, long len) throws IOException {
       long start, long len) throws IOException {
-    final InodeTree.ResolveResult<FileSystem> res = 
+    final InodeTree.ResolveResult<FileSystem> res =
       fsState.resolve(getUriPath(fs.getPath()), true);
       fsState.resolve(getUriPath(fs.getPath()), true);
     return res.targetFileSystem.getFileBlockLocations(
     return res.targetFileSystem.getFileBlockLocations(
         new ViewFsFileStatus(fs, res.remainingPath), start, len);
         new ViewFsFileStatus(fs, res.remainingPath), start, len);
@@ -513,7 +513,7 @@ public class ViewFileSystem extends FileSystem {
   public FileChecksum getFileChecksum(final Path f)
   public FileChecksum getFileChecksum(final Path f)
       throws AccessControlException, FileNotFoundException,
       throws AccessControlException, FileNotFoundException,
       IOException {
       IOException {
-    InodeTree.ResolveResult<FileSystem> res = 
+    InodeTree.ResolveResult<FileSystem> res =
       fsState.resolve(getUriPath(f), true);
       fsState.resolve(getUriPath(f), true);
     return res.targetFileSystem.getFileChecksum(res.remainingPath);
     return res.targetFileSystem.getFileChecksum(res.remainingPath);
   }
   }
@@ -570,7 +570,7 @@ public class ViewFileSystem extends FileSystem {
     FileStatus status =  res.targetFileSystem.getFileStatus(res.remainingPath);
     FileStatus status =  res.targetFileSystem.getFileStatus(res.remainingPath);
     return fixFileStatus(status, this.makeQualified(f));
     return fixFileStatus(status, this.makeQualified(f));
   }
   }
-  
+
   @Override
   @Override
   public void access(Path path, FsAction mode) throws AccessControlException,
   public void access(Path path, FsAction mode) throws AccessControlException,
       FileNotFoundException, IOException {
       FileNotFoundException, IOException {
@@ -611,7 +611,7 @@ public class ViewFileSystem extends FileSystem {
       FileNotFoundException, IOException {
       FileNotFoundException, IOException {
     InodeTree.ResolveResult<FileSystem> res =
     InodeTree.ResolveResult<FileSystem> res =
       fsState.resolve(getUriPath(f), true);
       fsState.resolve(getUriPath(f), true);
-    
+
     FileStatus[] statusLst = res.targetFileSystem.listStatus(res.remainingPath);
     FileStatus[] statusLst = res.targetFileSystem.listStatus(res.remainingPath);
     if (!res.isInternalDir()) {
     if (!res.isInternalDir()) {
       // We need to change the name in the FileStatus as described in
       // We need to change the name in the FileStatus as described in
@@ -675,7 +675,7 @@ public class ViewFileSystem extends FileSystem {
   @Override
   @Override
   public boolean mkdirs(final Path dir, final FsPermission permission)
   public boolean mkdirs(final Path dir, final FsPermission permission)
       throws IOException {
       throws IOException {
-    InodeTree.ResolveResult<FileSystem> res = 
+    InodeTree.ResolveResult<FileSystem> res =
         fsState.resolve(getUriPath(dir), false);
         fsState.resolve(getUriPath(dir), false);
     return res.targetFileSystem.mkdirs(res.remainingPath, permission);
     return res.targetFileSystem.mkdirs(res.remainingPath, permission);
   }
   }
@@ -683,15 +683,15 @@ public class ViewFileSystem extends FileSystem {
   @Override
   @Override
   public FSDataInputStream open(final Path f, final int bufferSize)
   public FSDataInputStream open(final Path f, final int bufferSize)
       throws AccessControlException, FileNotFoundException, IOException {
       throws AccessControlException, FileNotFoundException, IOException {
-    InodeTree.ResolveResult<FileSystem> res = 
+    InodeTree.ResolveResult<FileSystem> res =
         fsState.resolve(getUriPath(f), true);
         fsState.resolve(getUriPath(f), true);
     return res.targetFileSystem.open(res.remainingPath, bufferSize);
     return res.targetFileSystem.open(res.remainingPath, bufferSize);
   }
   }
 
 
-  
+
   @Override
   @Override
   public boolean rename(final Path src, final Path dst) throws IOException {
   public boolean rename(final Path src, final Path dst) throws IOException {
-    // passing resolveLastComponet as false to catch renaming a mount point to 
+    // passing resolveLastComponet as false to catch renaming a mount point to
     // itself. We need to catch this as an internal operation and fail if no
     // itself. We need to catch this as an internal operation and fail if no
     // fallback.
     // fallback.
     InodeTree.ResolveResult<FileSystem> resSrc =
     InodeTree.ResolveResult<FileSystem> resSrc =
@@ -802,28 +802,28 @@ public class ViewFileSystem extends FileSystem {
         fsState.resolve(getUriPath(f), true);
         fsState.resolve(getUriPath(f), true);
     return res.targetFileSystem.truncate(res.remainingPath, newLength);
     return res.targetFileSystem.truncate(res.remainingPath, newLength);
   }
   }
-  
+
   @Override
   @Override
   public void setOwner(final Path f, final String username,
   public void setOwner(final Path f, final String username,
       final String groupname) throws AccessControlException,
       final String groupname) throws AccessControlException,
       FileNotFoundException, IOException {
       FileNotFoundException, IOException {
-    InodeTree.ResolveResult<FileSystem> res = 
+    InodeTree.ResolveResult<FileSystem> res =
       fsState.resolve(getUriPath(f), true);
       fsState.resolve(getUriPath(f), true);
-    res.targetFileSystem.setOwner(res.remainingPath, username, groupname); 
+    res.targetFileSystem.setOwner(res.remainingPath, username, groupname);
   }
   }
 
 
   @Override
   @Override
   public void setPermission(final Path f, final FsPermission permission)
   public void setPermission(final Path f, final FsPermission permission)
       throws AccessControlException, FileNotFoundException, IOException {
       throws AccessControlException, FileNotFoundException, IOException {
-    InodeTree.ResolveResult<FileSystem> res = 
+    InodeTree.ResolveResult<FileSystem> res =
       fsState.resolve(getUriPath(f), true);
       fsState.resolve(getUriPath(f), true);
-    res.targetFileSystem.setPermission(res.remainingPath, permission); 
+    res.targetFileSystem.setPermission(res.remainingPath, permission);
   }
   }
 
 
   @Override
   @Override
   public boolean setReplication(final Path f, final short replication)
   public boolean setReplication(final Path f, final short replication)
       throws AccessControlException, FileNotFoundException, IOException {
       throws AccessControlException, FileNotFoundException, IOException {
-    InodeTree.ResolveResult<FileSystem> res = 
+    InodeTree.ResolveResult<FileSystem> res =
       fsState.resolve(getUriPath(f), true);
       fsState.resolve(getUriPath(f), true);
     return res.targetFileSystem.setReplication(res.remainingPath, replication);
     return res.targetFileSystem.setReplication(res.remainingPath, replication);
   }
   }
@@ -831,9 +831,9 @@ public class ViewFileSystem extends FileSystem {
   @Override
   @Override
   public void setTimes(final Path f, final long mtime, final long atime)
   public void setTimes(final Path f, final long mtime, final long atime)
       throws AccessControlException, FileNotFoundException, IOException {
       throws AccessControlException, FileNotFoundException, IOException {
-    InodeTree.ResolveResult<FileSystem> res = 
+    InodeTree.ResolveResult<FileSystem> res =
       fsState.resolve(getUriPath(f), true);
       fsState.resolve(getUriPath(f), true);
-    res.targetFileSystem.setTimes(res.remainingPath, mtime, atime); 
+    res.targetFileSystem.setTimes(res.remainingPath, mtime, atime);
   }
   }
 
 
   @Override
   @Override
@@ -955,7 +955,7 @@ public class ViewFileSystem extends FileSystem {
     }
     }
     return fsMap;
     return fsMap;
   }
   }
-  
+
   @Override
   @Override
   public long getDefaultBlockSize() {
   public long getDefaultBlockSize() {
     throw new NotInMountpointException("getDefaultBlockSize");
     throw new NotInMountpointException("getDefaultBlockSize");
@@ -978,7 +978,7 @@ public class ViewFileSystem extends FileSystem {
         fsState.resolve(getUriPath(f), true);
         fsState.resolve(getUriPath(f), true);
       return res.targetFileSystem.getDefaultBlockSize(res.remainingPath);
       return res.targetFileSystem.getDefaultBlockSize(res.remainingPath);
     } catch (FileNotFoundException e) {
     } catch (FileNotFoundException e) {
-      throw new NotInMountpointException(f, "getDefaultBlockSize"); 
+      throw new NotInMountpointException(f, "getDefaultBlockSize");
     } catch (IOException e) {
     } catch (IOException e) {
       throw new RuntimeException("Not able to initialize fs in "
       throw new RuntimeException("Not able to initialize fs in "
           + " getDefaultBlockSize for path " + f + " with exception", e);
           + " getDefaultBlockSize for path " + f + " with exception", e);
@@ -992,7 +992,7 @@ public class ViewFileSystem extends FileSystem {
         fsState.resolve(getUriPath(f), true);
         fsState.resolve(getUriPath(f), true);
       return res.targetFileSystem.getDefaultReplication(res.remainingPath);
       return res.targetFileSystem.getDefaultReplication(res.remainingPath);
     } catch (FileNotFoundException e) {
     } catch (FileNotFoundException e) {
-      throw new NotInMountpointException(f, "getDefaultReplication"); 
+      throw new NotInMountpointException(f, "getDefaultReplication");
     } catch (IOException e) {
     } catch (IOException e) {
       throw new RuntimeException("Not able to initialize fs in "
       throw new RuntimeException("Not able to initialize fs in "
           + " getDefaultReplication for path " + f + " with exception", e);
           + " getDefaultReplication for path " + f + " with exception", e);
@@ -1054,11 +1054,11 @@ public class ViewFileSystem extends FileSystem {
     }
     }
     return children.toArray(new FileSystem[]{});
     return children.toArray(new FileSystem[]{});
   }
   }
-  
+
   public MountPoint[] getMountPoints() {
   public MountPoint[] getMountPoints() {
-    List<InodeTree.MountPoint<FileSystem>> mountPoints = 
+    List<InodeTree.MountPoint<FileSystem>> mountPoints =
                   fsState.getMountPoints();
                   fsState.getMountPoints();
-    
+
     MountPoint[] result = new MountPoint[mountPoints.size()];
     MountPoint[] result = new MountPoint[mountPoints.size()];
     for ( int i = 0; i < mountPoints.size(); ++i ) {
     for ( int i = 0; i < mountPoints.size(); ++i ) {
       result[i] = new MountPoint(new Path(mountPoints.get(i).src),
       result[i] = new MountPoint(new Path(mountPoints.get(i).src),
@@ -1375,9 +1375,9 @@ public class ViewFileSystem extends FileSystem {
    * are not allowed.
    * are not allowed.
    * If called on create or mkdir then this target is the parent of the
    * If called on create or mkdir then this target is the parent of the
    * directory in which one is trying to create or mkdir; hence
    * directory in which one is trying to create or mkdir; hence
-   * in this case the path name passed in is the last component. 
+   * in this case the path name passed in is the last component.
    * Otherwise this target is the end point of the path and hence
    * Otherwise this target is the end point of the path and hence
-   * the path name passed in is null. 
+   * the path name passed in is null.
    */
    */
   static class InternalDirOfViewFs extends FileSystem {
   static class InternalDirOfViewFs extends FileSystem {
     final InodeTree.INodeDir<FileSystem>  theInternalDir;
     final InodeTree.INodeDir<FileSystem>  theInternalDir;
@@ -1386,7 +1386,7 @@ public class ViewFileSystem extends FileSystem {
     final URI myUri;
     final URI myUri;
     private final boolean showMountLinksAsSymlinks;
     private final boolean showMountLinksAsSymlinks;
     private InodeTree<FileSystem> fsState;
     private InodeTree<FileSystem> fsState;
-    
+
     public InternalDirOfViewFs(final InodeTree.INodeDir<FileSystem> dir,
     public InternalDirOfViewFs(final InodeTree.INodeDir<FileSystem> dir,
         final long cTime, final UserGroupInformation ugi, URI uri,
         final long cTime, final UserGroupInformation ugi, URI uri,
         Configuration config, InodeTree fsState) throws URISyntaxException {
         Configuration config, InodeTree fsState) throws URISyntaxException {
@@ -1411,7 +1411,7 @@ public class ViewFileSystem extends FileSystem {
             "Internal implementation error: expected file name to be /");
             "Internal implementation error: expected file name to be /");
       }
       }
     }
     }
-    
+
     @Override
     @Override
     public URI getUri() {
     public URI getUri() {
       return myUri;
       return myUri;
@@ -1481,7 +1481,7 @@ public class ViewFileSystem extends FileSystem {
       checkPathIsSlash(f);
       checkPathIsSlash(f);
       throw readOnlyMountTable("delete", f);
       throw readOnlyMountTable("delete", f);
     }
     }
-    
+
     @Override
     @Override
     @SuppressWarnings("deprecation")
     @SuppressWarnings("deprecation")
     public boolean delete(final Path f)
     public boolean delete(final Path f)
@@ -1529,7 +1529,7 @@ public class ViewFileSystem extends FileSystem {
           new Path(theInternalDir.fullPath).makeQualified(
           new Path(theInternalDir.fullPath).makeQualified(
               myUri, ROOT_PATH));
               myUri, ROOT_PATH));
     }
     }
-    
+
 
 
     @Override
     @Override
     public FileStatus[] listStatus(Path f) throws AccessControlException,
     public FileStatus[] listStatus(Path f) throws AccessControlException,
@@ -1544,7 +1544,7 @@ public class ViewFileSystem extends FileSystem {
         INode<FileSystem> inode = iEntry.getValue();
         INode<FileSystem> inode = iEntry.getValue();
         Path path = new Path(inode.fullPath).makeQualified(myUri, null);
         Path path = new Path(inode.fullPath).makeQualified(myUri, null);
         if (inode.isLink()) {
         if (inode.isLink()) {
-          INodeLink<FileSystem> link = (INodeLink<FileSystem>) inode;
+          INodeLink<FileSystem> link = inode.getLink();
 
 
           if (showMountLinksAsSymlinks) {
           if (showMountLinksAsSymlinks) {
             // To maintain backward compatibility, with default option(showing
             // To maintain backward compatibility, with default option(showing
@@ -1721,7 +1721,7 @@ public class ViewFileSystem extends FileSystem {
         IOException {
         IOException {
       checkPathIsSlash(src);
       checkPathIsSlash(src);
       checkPathIsSlash(dst);
       checkPathIsSlash(dst);
-      throw readOnlyMountTable("rename", src);     
+      throw readOnlyMountTable("rename", src);
     }
     }
 
 
     @Override
     @Override
@@ -1740,7 +1740,7 @@ public class ViewFileSystem extends FileSystem {
     public void setPermission(Path f, FsPermission permission)
     public void setPermission(Path f, FsPermission permission)
         throws AccessControlException, IOException {
         throws AccessControlException, IOException {
       checkPathIsSlash(f);
       checkPathIsSlash(f);
-      throw readOnlyMountTable("setPermission", f);    
+      throw readOnlyMountTable("setPermission", f);
     }
     }
 
 
     @Override
     @Override
@@ -1754,7 +1754,7 @@ public class ViewFileSystem extends FileSystem {
     public void setTimes(Path f, long mtime, long atime)
     public void setTimes(Path f, long mtime, long atime)
         throws AccessControlException, IOException {
         throws AccessControlException, IOException {
       checkPathIsSlash(f);
       checkPathIsSlash(f);
-      throw readOnlyMountTable("setTimes", f);    
+      throw readOnlyMountTable("setTimes", f);
     }
     }
 
 
     @Override
     @Override
@@ -1766,7 +1766,7 @@ public class ViewFileSystem extends FileSystem {
     public FsServerDefaults getServerDefaults(Path f) throws IOException {
     public FsServerDefaults getServerDefaults(Path f) throws IOException {
       throw new NotInMountpointException(f, "getServerDefaults");
       throw new NotInMountpointException(f, "getServerDefaults");
     }
     }
-    
+
     @Override
     @Override
     public long getDefaultBlockSize(Path f) {
     public long getDefaultBlockSize(Path f) {
       throw new NotInMountpointException(f, "getDefaultBlockSize");
       throw new NotInMountpointException(f, "getDefaultBlockSize");

+ 79 - 81
hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/viewfs/ViewFs.java

@@ -80,8 +80,8 @@ import org.slf4j.LoggerFactory;
 /**
 /**
  * ViewFs (extends the AbstractFileSystem interface) implements a client-side
  * ViewFs (extends the AbstractFileSystem interface) implements a client-side
  * mount table. The viewFs file system is implemented completely in memory on
  * mount table. The viewFs file system is implemented completely in memory on
- * the client side. The client-side mount table allows a client to provide a 
- * customized view of a file system namespace that is composed from 
+ * the client side. The client-side mount table allows a client to provide a
+ * customized view of a file system namespace that is composed from
  * one or more individual file systems (a localFs or Hdfs, S3fs, etc).
  * one or more individual file systems (a localFs or Hdfs, S3fs, etc).
  * For example one could have a mount table that provides links such as
  * For example one could have a mount table that provides links such as
  * <ul>
  * <ul>
@@ -89,26 +89,26 @@ import org.slf4j.LoggerFactory;
  * <li>  /project/foo   {@literal ->} hdfs://nnProject1/projects/foo
  * <li>  /project/foo   {@literal ->} hdfs://nnProject1/projects/foo
  * <li>  /project/bar   {@literal ->} hdfs://nnProject2/projects/bar
  * <li>  /project/bar   {@literal ->} hdfs://nnProject2/projects/bar
  * <li>  /tmp           {@literal ->} hdfs://nnTmp/privateTmpForUserXXX
  * <li>  /tmp           {@literal ->} hdfs://nnTmp/privateTmpForUserXXX
- * </ul> 
- * 
- * ViewFs is specified with the following URI: <b>viewfs:///</b> 
+ * </ul>
+ *
+ * ViewFs is specified with the following URI: <b>viewfs:///</b>
  * <p>
  * <p>
  * To use viewfs one would typically set the default file system in the
  * To use viewfs one would typically set the default file system in the
  * config  (i.e. fs.defaultFS {@literal <} = viewfs:///) along with the
  * config  (i.e. fs.defaultFS {@literal <} = viewfs:///) along with the
- * mount table config variables as described below. 
- * 
+ * mount table config variables as described below.
+ *
  * <p>
  * <p>
  * <b> ** Config variables to specify the mount table entries ** </b>
  * <b> ** Config variables to specify the mount table entries ** </b>
  * <p>
  * <p>
- * 
+ *
  * The file system is initialized from the standard Hadoop config through
  * The file system is initialized from the standard Hadoop config through
  * config variables.
  * config variables.
- * See {@link FsConstants} for URI and Scheme constants; 
- * See {@link Constants} for config var constants; 
+ * See {@link FsConstants} for URI and Scheme constants;
+ * See {@link Constants} for config var constants;
  * see {@link ConfigUtil} for convenient lib.
  * see {@link ConfigUtil} for convenient lib.
- * 
+ *
  * <p>
  * <p>
- * All the mount table config entries for view fs are prefixed by 
+ * All the mount table config entries for view fs are prefixed by
  * <b>fs.viewfs.mounttable.</b>
  * <b>fs.viewfs.mounttable.</b>
  * For example the above example can be specified with the following
  * For example the above example can be specified with the following
  *  config variables:
  *  config variables:
@@ -122,8 +122,8 @@ import org.slf4j.LoggerFactory;
  *  <li> fs.viewfs.mounttable.default.link./tmp=
  *  <li> fs.viewfs.mounttable.default.link./tmp=
  *  hdfs://nnTmp/privateTmpForUserXXX
  *  hdfs://nnTmp/privateTmpForUserXXX
  *  </ul>
  *  </ul>
- *  
- * The default mount table (when no authority is specified) is 
+ *
+ * The default mount table (when no authority is specified) is
  * from config variables prefixed by <b>fs.viewFs.mounttable.default </b>
  * from config variables prefixed by <b>fs.viewFs.mounttable.default </b>
  * The authority component of a URI can be used to specify a different mount
  * The authority component of a URI can be used to specify a different mount
  * table. For example,
  * table. For example,
@@ -131,11 +131,11 @@ import org.slf4j.LoggerFactory;
  * <li>  viewfs://sanjayMountable/
  * <li>  viewfs://sanjayMountable/
  * </ul>
  * </ul>
  * is initialized from fs.viewFs.mounttable.sanjayMountable.* config variables.
  * is initialized from fs.viewFs.mounttable.sanjayMountable.* config variables.
- * 
- *  <p> 
+ *
+ *  <p>
  *  <b> **** Merge Mounts **** </b>(NOTE: merge mounts are not implemented yet.)
  *  <b> **** Merge Mounts **** </b>(NOTE: merge mounts are not implemented yet.)
  *  <p>
  *  <p>
- *  
+ *
  *   One can also use "MergeMounts" to merge several directories (this is
  *   One can also use "MergeMounts" to merge several directories (this is
  *   sometimes  called union-mounts or junction-mounts in the literature.
  *   sometimes  called union-mounts or junction-mounts in the literature.
  *   For example of the home directories are stored on say two file systems
  *   For example of the home directories are stored on say two file systems
@@ -156,7 +156,7 @@ import org.slf4j.LoggerFactory;
  *   <li>    fs.viewfs.mounttable.default.linkMergeSlash=hdfs://nn99/
  *   <li>    fs.viewfs.mounttable.default.linkMergeSlash=hdfs://nn99/
  *   </ul>
  *   </ul>
  *   In this cases the root of the mount table is merged with the root of
  *   In this cases the root of the mount table is merged with the root of
- *            <b>hdfs://nn99/ </b> 
+ *            <b>hdfs://nn99/ </b>
  */
  */
 
 
 @InterfaceAudience.Public
 @InterfaceAudience.Public
@@ -182,8 +182,8 @@ public class ViewFs extends AbstractFileSystem {
       final Path p) {
       final Path p) {
     return readOnlyMountTable(operation, p.toString());
     return readOnlyMountTable(operation, p.toString());
   }
   }
-  
-  
+
+
   static public class MountPoint {
   static public class MountPoint {
     // the src of the mount
     // the src of the mount
     private Path src;
     private Path src;
@@ -214,15 +214,15 @@ public class ViewFs extends AbstractFileSystem {
       URISyntaxException {
       URISyntaxException {
     this(FsConstants.VIEWFS_URI, conf);
     this(FsConstants.VIEWFS_URI, conf);
   }
   }
-  
+
   /**
   /**
    * This constructor has the signature needed by
    * This constructor has the signature needed by
    * {@link AbstractFileSystem#createFileSystem(URI, Configuration)}.
    * {@link AbstractFileSystem#createFileSystem(URI, Configuration)}.
-   * 
+   *
    * @param theUri which must be that of ViewFs
    * @param theUri which must be that of ViewFs
    * @param conf
    * @param conf
    * @throws IOException
    * @throws IOException
-   * @throws URISyntaxException 
+   * @throws URISyntaxException
    */
    */
   ViewFs(final URI theUri, final Configuration conf) throws IOException,
   ViewFs(final URI theUri, final Configuration conf) throws IOException,
       URISyntaxException {
       URISyntaxException {
@@ -292,7 +292,7 @@ public class ViewFs extends AbstractFileSystem {
   @Override
   @Override
   @Deprecated
   @Deprecated
   public FsServerDefaults getServerDefaults() throws IOException {
   public FsServerDefaults getServerDefaults() throws IOException {
-    return LocalConfigKeys.getServerDefaults(); 
+    return LocalConfigKeys.getServerDefaults();
   }
   }
 
 
   @Override
   @Override
@@ -310,7 +310,7 @@ public class ViewFs extends AbstractFileSystem {
   public int getUriDefaultPort() {
   public int getUriDefaultPort() {
     return -1;
     return -1;
   }
   }
- 
+
   @Override
   @Override
   public Path getHomeDirectory() {
   public Path getHomeDirectory() {
     if (homeDir == null) {
     if (homeDir == null) {
@@ -318,13 +318,13 @@ public class ViewFs extends AbstractFileSystem {
       if (base == null) {
       if (base == null) {
         base = "/user";
         base = "/user";
       }
       }
-      homeDir = (base.equals("/") ? 
+      homeDir = (base.equals("/") ?
         this.makeQualified(new Path(base + ugi.getShortUserName())):
         this.makeQualified(new Path(base + ugi.getShortUserName())):
         this.makeQualified(new Path(base + "/" + ugi.getShortUserName())));
         this.makeQualified(new Path(base + "/" + ugi.getShortUserName())));
     }
     }
     return homeDir;
     return homeDir;
   }
   }
-  
+
   @Override
   @Override
   public Path resolvePath(final Path f) throws FileNotFoundException,
   public Path resolvePath(final Path f) throws FileNotFoundException,
           AccessControlException, UnresolvedLinkException, IOException {
           AccessControlException, UnresolvedLinkException, IOException {
@@ -336,7 +336,7 @@ public class ViewFs extends AbstractFileSystem {
     return res.targetFileSystem.resolvePath(res.remainingPath);
     return res.targetFileSystem.resolvePath(res.remainingPath);
 
 
   }
   }
-  
+
   @Override
   @Override
   public FSDataOutputStream createInternal(final Path f,
   public FSDataOutputStream createInternal(final Path f,
       final EnumSet<CreateFlag> flag, final FsPermission absolutePermission,
       final EnumSet<CreateFlag> flag, final FsPermission absolutePermission,
@@ -367,7 +367,7 @@ public class ViewFs extends AbstractFileSystem {
   public boolean delete(final Path f, final boolean recursive)
   public boolean delete(final Path f, final boolean recursive)
       throws AccessControlException, FileNotFoundException,
       throws AccessControlException, FileNotFoundException,
       UnresolvedLinkException, IOException {
       UnresolvedLinkException, IOException {
-    InodeTree.ResolveResult<AbstractFileSystem> res = 
+    InodeTree.ResolveResult<AbstractFileSystem> res =
       fsState.resolve(getUriPath(f), true);
       fsState.resolve(getUriPath(f), true);
     // If internal dir or target is a mount link (ie remainingPath is Slash)
     // If internal dir or target is a mount link (ie remainingPath is Slash)
     if (res.isInternalDir() || res.remainingPath == InodeTree.SlashPath) {
     if (res.isInternalDir() || res.remainingPath == InodeTree.SlashPath) {
@@ -381,7 +381,7 @@ public class ViewFs extends AbstractFileSystem {
   public BlockLocation[] getFileBlockLocations(final Path f, final long start,
   public BlockLocation[] getFileBlockLocations(final Path f, final long start,
       final long len) throws AccessControlException, FileNotFoundException,
       final long len) throws AccessControlException, FileNotFoundException,
       UnresolvedLinkException, IOException {
       UnresolvedLinkException, IOException {
-    InodeTree.ResolveResult<AbstractFileSystem> res = 
+    InodeTree.ResolveResult<AbstractFileSystem> res =
       fsState.resolve(getUriPath(f), true);
       fsState.resolve(getUriPath(f), true);
     return
     return
       res.targetFileSystem.getFileBlockLocations(res.remainingPath, start, len);
       res.targetFileSystem.getFileBlockLocations(res.remainingPath, start, len);
@@ -391,7 +391,7 @@ public class ViewFs extends AbstractFileSystem {
   public FileChecksum getFileChecksum(final Path f)
   public FileChecksum getFileChecksum(final Path f)
       throws AccessControlException, FileNotFoundException,
       throws AccessControlException, FileNotFoundException,
       UnresolvedLinkException, IOException {
       UnresolvedLinkException, IOException {
-    InodeTree.ResolveResult<AbstractFileSystem> res = 
+    InodeTree.ResolveResult<AbstractFileSystem> res =
       fsState.resolve(getUriPath(f), true);
       fsState.resolve(getUriPath(f), true);
     return res.targetFileSystem.getFileChecksum(res.remainingPath);
     return res.targetFileSystem.getFileChecksum(res.remainingPath);
   }
   }
@@ -407,20 +407,20 @@ public class ViewFs extends AbstractFileSystem {
   @Override
   @Override
   public FileStatus getFileStatus(final Path f) throws AccessControlException,
   public FileStatus getFileStatus(final Path f) throws AccessControlException,
       FileNotFoundException, UnresolvedLinkException, IOException {
       FileNotFoundException, UnresolvedLinkException, IOException {
-    InodeTree.ResolveResult<AbstractFileSystem> res = 
+    InodeTree.ResolveResult<AbstractFileSystem> res =
       fsState.resolve(getUriPath(f), true);
       fsState.resolve(getUriPath(f), true);
 
 
-    //  FileStatus#getPath is a fully qualified path relative to the root of 
+    //  FileStatus#getPath is a fully qualified path relative to the root of
     // target file system.
     // target file system.
     // We need to change it to viewfs URI - relative to root of mount table.
     // We need to change it to viewfs URI - relative to root of mount table.
-    
+
     // The implementors of RawLocalFileSystem were trying to be very smart.
     // The implementors of RawLocalFileSystem were trying to be very smart.
     // They implement FileStatus#getOwener lazily -- the object
     // They implement FileStatus#getOwener lazily -- the object
     // returned is really a RawLocalFileSystem that expect the
     // returned is really a RawLocalFileSystem that expect the
     // FileStatus#getPath to be unchanged so that it can get owner when needed.
     // FileStatus#getPath to be unchanged so that it can get owner when needed.
     // Hence we need to interpose a new ViewFsFileStatus that works around.
     // Hence we need to interpose a new ViewFsFileStatus that works around.
-    
-    
+
+
     FileStatus status =  res.targetFileSystem.getFileStatus(res.remainingPath);
     FileStatus status =  res.targetFileSystem.getFileStatus(res.remainingPath);
     return new ViewFsFileStatus(status, this.makeQualified(f));
     return new ViewFsFileStatus(status, this.makeQualified(f));
   }
   }
@@ -437,11 +437,11 @@ public class ViewFs extends AbstractFileSystem {
   public FileStatus getFileLinkStatus(final Path f)
   public FileStatus getFileLinkStatus(final Path f)
      throws AccessControlException, FileNotFoundException,
      throws AccessControlException, FileNotFoundException,
      UnsupportedFileSystemException, IOException {
      UnsupportedFileSystemException, IOException {
-    InodeTree.ResolveResult<AbstractFileSystem> res = 
+    InodeTree.ResolveResult<AbstractFileSystem> res =
       fsState.resolve(getUriPath(f), false); // do not follow mount link
       fsState.resolve(getUriPath(f), false); // do not follow mount link
     return res.targetFileSystem.getFileLinkStatus(res.remainingPath);
     return res.targetFileSystem.getFileLinkStatus(res.remainingPath);
   }
   }
-  
+
   @Override
   @Override
   public FsStatus getFsStatus() throws AccessControlException,
   public FsStatus getFsStatus() throws AccessControlException,
       FileNotFoundException, IOException {
       FileNotFoundException, IOException {
@@ -488,7 +488,7 @@ public class ViewFs extends AbstractFileSystem {
       }
       }
     };
     };
   }
   }
-  
+
   /**
   /**
    * {@inheritDoc}
    * {@inheritDoc}
    *
    *
@@ -520,7 +520,7 @@ public class ViewFs extends AbstractFileSystem {
       FileNotFoundException, UnresolvedLinkException, IOException {
       FileNotFoundException, UnresolvedLinkException, IOException {
     InodeTree.ResolveResult<AbstractFileSystem> res =
     InodeTree.ResolveResult<AbstractFileSystem> res =
       fsState.resolve(getUriPath(f), true);
       fsState.resolve(getUriPath(f), true);
-    
+
     FileStatus[] statusLst = res.targetFileSystem.listStatus(res.remainingPath);
     FileStatus[] statusLst = res.targetFileSystem.listStatus(res.remainingPath);
     if (!res.isInternalDir()) {
     if (!res.isInternalDir()) {
       // We need to change the name in the FileStatus as described in
       // We need to change the name in the FileStatus as described in
@@ -542,7 +542,7 @@ public class ViewFs extends AbstractFileSystem {
       final boolean createParent) throws AccessControlException,
       final boolean createParent) throws AccessControlException,
       FileAlreadyExistsException,
       FileAlreadyExistsException,
       FileNotFoundException, UnresolvedLinkException, IOException {
       FileNotFoundException, UnresolvedLinkException, IOException {
-    InodeTree.ResolveResult<AbstractFileSystem> res = 
+    InodeTree.ResolveResult<AbstractFileSystem> res =
       fsState.resolve(getUriPath(dir), false);
       fsState.resolve(getUriPath(dir), false);
     res.targetFileSystem.mkdir(res.remainingPath, permission, createParent);
     res.targetFileSystem.mkdir(res.remainingPath, permission, createParent);
   }
   }
@@ -551,7 +551,7 @@ public class ViewFs extends AbstractFileSystem {
   public FSDataInputStream open(final Path f, final int bufferSize)
   public FSDataInputStream open(final Path f, final int bufferSize)
       throws AccessControlException, FileNotFoundException,
       throws AccessControlException, FileNotFoundException,
       UnresolvedLinkException, IOException {
       UnresolvedLinkException, IOException {
-    InodeTree.ResolveResult<AbstractFileSystem> res = 
+    InodeTree.ResolveResult<AbstractFileSystem> res =
         fsState.resolve(getUriPath(f), true);
         fsState.resolve(getUriPath(f), true);
     return res.targetFileSystem.open(res.remainingPath, bufferSize);
     return res.targetFileSystem.open(res.remainingPath, bufferSize);
   }
   }
@@ -568,7 +568,7 @@ public class ViewFs extends AbstractFileSystem {
   @Override
   @Override
   public void renameInternal(final Path src, final Path dst,
   public void renameInternal(final Path src, final Path dst,
       final boolean overwrite) throws IOException, UnresolvedLinkException {
       final boolean overwrite) throws IOException, UnresolvedLinkException {
-    // passing resolveLastComponet as false to catch renaming a mount point 
+    // passing resolveLastComponet as false to catch renaming a mount point
     // itself we need to catch this as an internal operation and fail if no
     // itself we need to catch this as an internal operation and fail if no
     // fallback.
     // fallback.
     InodeTree.ResolveResult<AbstractFileSystem> resSrc =
     InodeTree.ResolveResult<AbstractFileSystem> resSrc =
@@ -642,12 +642,12 @@ public class ViewFs extends AbstractFileSystem {
       UnresolvedLinkException, IOException {
       UnresolvedLinkException, IOException {
     renameInternal(src, dst, false);
     renameInternal(src, dst, false);
   }
   }
-  
+
   @Override
   @Override
   public boolean supportsSymlinks() {
   public boolean supportsSymlinks() {
     return true;
     return true;
   }
   }
-  
+
   @Override
   @Override
   public void createSymlink(final Path target, final Path link,
   public void createSymlink(final Path target, final Path link,
       final boolean createParent) throws IOException, UnresolvedLinkException {
       final boolean createParent) throws IOException, UnresolvedLinkException {
@@ -663,12 +663,12 @@ public class ViewFs extends AbstractFileSystem {
     }
     }
     assert(res.remainingPath != null);
     assert(res.remainingPath != null);
     res.targetFileSystem.createSymlink(target, res.remainingPath,
     res.targetFileSystem.createSymlink(target, res.remainingPath,
-        createParent);  
+        createParent);
   }
   }
 
 
   @Override
   @Override
   public Path getLinkTarget(final Path f) throws IOException {
   public Path getLinkTarget(final Path f) throws IOException {
-    InodeTree.ResolveResult<AbstractFileSystem> res = 
+    InodeTree.ResolveResult<AbstractFileSystem> res =
       fsState.resolve(getUriPath(f), false); // do not follow mount link
       fsState.resolve(getUriPath(f), false); // do not follow mount link
     return res.targetFileSystem.getLinkTarget(res.remainingPath);
     return res.targetFileSystem.getLinkTarget(res.remainingPath);
   }
   }
@@ -677,26 +677,26 @@ public class ViewFs extends AbstractFileSystem {
   public void setOwner(final Path f, final String username,
   public void setOwner(final Path f, final String username,
       final String groupname) throws AccessControlException,
       final String groupname) throws AccessControlException,
       FileNotFoundException, UnresolvedLinkException, IOException {
       FileNotFoundException, UnresolvedLinkException, IOException {
-    InodeTree.ResolveResult<AbstractFileSystem> res = 
+    InodeTree.ResolveResult<AbstractFileSystem> res =
       fsState.resolve(getUriPath(f), true);
       fsState.resolve(getUriPath(f), true);
-    res.targetFileSystem.setOwner(res.remainingPath, username, groupname); 
+    res.targetFileSystem.setOwner(res.remainingPath, username, groupname);
   }
   }
 
 
   @Override
   @Override
   public void setPermission(final Path f, final FsPermission permission)
   public void setPermission(final Path f, final FsPermission permission)
       throws AccessControlException, FileNotFoundException,
       throws AccessControlException, FileNotFoundException,
       UnresolvedLinkException, IOException {
       UnresolvedLinkException, IOException {
-    InodeTree.ResolveResult<AbstractFileSystem> res = 
+    InodeTree.ResolveResult<AbstractFileSystem> res =
       fsState.resolve(getUriPath(f), true);
       fsState.resolve(getUriPath(f), true);
-    res.targetFileSystem.setPermission(res.remainingPath, permission); 
-    
+    res.targetFileSystem.setPermission(res.remainingPath, permission);
+
   }
   }
 
 
   @Override
   @Override
   public boolean setReplication(final Path f, final short replication)
   public boolean setReplication(final Path f, final short replication)
       throws AccessControlException, FileNotFoundException,
       throws AccessControlException, FileNotFoundException,
       UnresolvedLinkException, IOException {
       UnresolvedLinkException, IOException {
-    InodeTree.ResolveResult<AbstractFileSystem> res = 
+    InodeTree.ResolveResult<AbstractFileSystem> res =
       fsState.resolve(getUriPath(f), true);
       fsState.resolve(getUriPath(f), true);
     return res.targetFileSystem.setReplication(res.remainingPath, replication);
     return res.targetFileSystem.setReplication(res.remainingPath, replication);
   }
   }
@@ -705,41 +705,41 @@ public class ViewFs extends AbstractFileSystem {
   public void setTimes(final Path f, final long mtime, final long atime)
   public void setTimes(final Path f, final long mtime, final long atime)
       throws AccessControlException, FileNotFoundException,
       throws AccessControlException, FileNotFoundException,
       UnresolvedLinkException, IOException {
       UnresolvedLinkException, IOException {
-    InodeTree.ResolveResult<AbstractFileSystem> res = 
+    InodeTree.ResolveResult<AbstractFileSystem> res =
       fsState.resolve(getUriPath(f), true);
       fsState.resolve(getUriPath(f), true);
-    res.targetFileSystem.setTimes(res.remainingPath, mtime, atime); 
+    res.targetFileSystem.setTimes(res.remainingPath, mtime, atime);
   }
   }
 
 
   @Override
   @Override
   public void setVerifyChecksum(final boolean verifyChecksum)
   public void setVerifyChecksum(final boolean verifyChecksum)
       throws AccessControlException, IOException {
       throws AccessControlException, IOException {
-    // This is a file system level operations, however ViewFs 
-    // points to many file systems. Noop for ViewFs. 
+    // This is a file system level operations, however ViewFs
+    // points to many file systems. Noop for ViewFs.
   }
   }
-  
+
   public MountPoint[] getMountPoints() {
   public MountPoint[] getMountPoints() {
-    List<InodeTree.MountPoint<AbstractFileSystem>> mountPoints = 
+    List<InodeTree.MountPoint<AbstractFileSystem>> mountPoints =
                   fsState.getMountPoints();
                   fsState.getMountPoints();
-    
+
     MountPoint[] result = new MountPoint[mountPoints.size()];
     MountPoint[] result = new MountPoint[mountPoints.size()];
     for ( int i = 0; i < mountPoints.size(); ++i ) {
     for ( int i = 0; i < mountPoints.size(); ++i ) {
-      result[i] = new MountPoint(new Path(mountPoints.get(i).src), 
+      result[i] = new MountPoint(new Path(mountPoints.get(i).src),
                               mountPoints.get(i).target.targetDirLinkList);
                               mountPoints.get(i).target.targetDirLinkList);
     }
     }
     return result;
     return result;
   }
   }
-  
+
   @Override
   @Override
   public List<Token<?>> getDelegationTokens(String renewer) throws IOException {
   public List<Token<?>> getDelegationTokens(String renewer) throws IOException {
-    List<InodeTree.MountPoint<AbstractFileSystem>> mountPoints = 
+    List<InodeTree.MountPoint<AbstractFileSystem>> mountPoints =
                 fsState.getMountPoints();
                 fsState.getMountPoints();
     int initialListSize  = 0;
     int initialListSize  = 0;
     for (InodeTree.MountPoint<AbstractFileSystem> im : mountPoints) {
     for (InodeTree.MountPoint<AbstractFileSystem> im : mountPoints) {
-      initialListSize += im.target.targetDirLinkList.length; 
+      initialListSize += im.target.targetDirLinkList.length;
     }
     }
     List<Token<?>> result = new ArrayList<Token<?>>(initialListSize);
     List<Token<?>> result = new ArrayList<Token<?>>(initialListSize);
     for ( int i = 0; i < mountPoints.size(); ++i ) {
     for ( int i = 0; i < mountPoints.size(); ++i ) {
-      List<Token<?>> tokens = 
+      List<Token<?>> tokens =
           mountPoints.get(i).target.getTargetFileSystem()
           mountPoints.get(i).target.getTargetFileSystem()
               .getDelegationTokens(renewer);
               .getDelegationTokens(renewer);
       if (tokens != null) {
       if (tokens != null) {
@@ -955,18 +955,18 @@ public class ViewFs extends AbstractFileSystem {
   }
   }
 
 
   /*
   /*
-   * An instance of this class represents an internal dir of the viewFs 
+   * An instance of this class represents an internal dir of the viewFs
    * ie internal dir of the mount table.
    * ie internal dir of the mount table.
    * It is a ready only mount tbale and create, mkdir or delete operations
    * It is a ready only mount tbale and create, mkdir or delete operations
    * are not allowed.
    * are not allowed.
    * If called on create or mkdir then this target is the parent of the
    * If called on create or mkdir then this target is the parent of the
    * directory in which one is trying to create or mkdir; hence
    * directory in which one is trying to create or mkdir; hence
-   * in this case the path name passed in is the last component. 
+   * in this case the path name passed in is the last component.
    * Otherwise this target is the end point of the path and hence
    * Otherwise this target is the end point of the path and hence
-   * the path name passed in is null. 
+   * the path name passed in is null.
    */
    */
   static class InternalDirOfViewFs extends AbstractFileSystem {
   static class InternalDirOfViewFs extends AbstractFileSystem {
-    
+
     final InodeTree.INodeDir<AbstractFileSystem>  theInternalDir;
     final InodeTree.INodeDir<AbstractFileSystem>  theInternalDir;
     final long creationTime; // of the the mount table
     final long creationTime; // of the the mount table
     final UserGroupInformation ugi; // the user/group of user who created mtable
     final UserGroupInformation ugi; // the user/group of user who created mtable
@@ -1085,7 +1085,7 @@ public class ViewFs extends AbstractFileSystem {
           new Path(theInternalDir.fullPath).makeQualified(
           new Path(theInternalDir.fullPath).makeQualified(
               myUri, null));
               myUri, null));
     }
     }
-    
+
     @Override
     @Override
     public FileStatus getFileLinkStatus(final Path f)
     public FileStatus getFileLinkStatus(final Path f)
         throws IOException {
         throws IOException {
@@ -1098,8 +1098,7 @@ public class ViewFs extends AbstractFileSystem {
       }
       }
       FileStatus result;
       FileStatus result;
       if (inode.isLink()) {
       if (inode.isLink()) {
-        INodeLink<AbstractFileSystem> inodelink = 
-          (INodeLink<AbstractFileSystem>) inode;
+        INodeLink<AbstractFileSystem> inodelink = inode.getLink();
         try {
         try {
           String linkedPath = inodelink.getTargetFileSystem()
           String linkedPath = inodelink.getTargetFileSystem()
               .getUri().getPath();
               .getUri().getPath();
@@ -1127,7 +1126,7 @@ public class ViewFs extends AbstractFileSystem {
       }
       }
       return result;
       return result;
     }
     }
-    
+
     @Override
     @Override
     public FsStatus getFsStatus() {
     public FsStatus getFsStatus() {
       return new FsStatus(0, 0, 0);
       return new FsStatus(0, 0, 0);
@@ -1169,8 +1168,7 @@ public class ViewFs extends AbstractFileSystem {
         INode<AbstractFileSystem> inode = iEntry.getValue();
         INode<AbstractFileSystem> inode = iEntry.getValue();
         Path path = new Path(inode.fullPath).makeQualified(myUri, null);
         Path path = new Path(inode.fullPath).makeQualified(myUri, null);
         if (inode.isLink()) {
         if (inode.isLink()) {
-          INodeLink<AbstractFileSystem> link = 
-            (INodeLink<AbstractFileSystem>) inode;
+          INodeLink<AbstractFileSystem> link = inode.getLink();
 
 
           if (showMountLinksAsSymlinks) {
           if (showMountLinksAsSymlinks) {
             // To maintain backward compatibility, with default option(showing
             // To maintain backward compatibility, with default option(showing
@@ -1319,18 +1317,18 @@ public class ViewFs extends AbstractFileSystem {
         throws AccessControlException, IOException {
         throws AccessControlException, IOException {
       checkPathIsSlash(src);
       checkPathIsSlash(src);
       checkPathIsSlash(dst);
       checkPathIsSlash(dst);
-      throw readOnlyMountTable("rename", src);     
+      throw readOnlyMountTable("rename", src);
     }
     }
 
 
     @Override
     @Override
     public boolean supportsSymlinks() {
     public boolean supportsSymlinks() {
       return true;
       return true;
     }
     }
-    
+
     @Override
     @Override
     public void createSymlink(final Path target, final Path link,
     public void createSymlink(final Path target, final Path link,
         final boolean createParent) throws AccessControlException {
         final boolean createParent) throws AccessControlException {
-      throw readOnlyMountTable("createSymlink", link);    
+      throw readOnlyMountTable("createSymlink", link);
     }
     }
 
 
     @Override
     @Override
@@ -1350,7 +1348,7 @@ public class ViewFs extends AbstractFileSystem {
     public void setPermission(final Path f, final FsPermission permission)
     public void setPermission(final Path f, final FsPermission permission)
         throws AccessControlException, IOException {
         throws AccessControlException, IOException {
       checkPathIsSlash(f);
       checkPathIsSlash(f);
-      throw readOnlyMountTable("setPermission", f);    
+      throw readOnlyMountTable("setPermission", f);
     }
     }
 
 
     @Override
     @Override
@@ -1364,13 +1362,13 @@ public class ViewFs extends AbstractFileSystem {
     public void setTimes(final Path f, final long mtime, final long atime)
     public void setTimes(final Path f, final long mtime, final long atime)
         throws AccessControlException, IOException {
         throws AccessControlException, IOException {
       checkPathIsSlash(f);
       checkPathIsSlash(f);
-      throw readOnlyMountTable("setTimes", f);    
+      throw readOnlyMountTable("setTimes", f);
     }
     }
 
 
     @Override
     @Override
     public void setVerifyChecksum(final boolean verifyChecksum)
     public void setVerifyChecksum(final boolean verifyChecksum)
         throws AccessControlException {
         throws AccessControlException {
-      throw readOnlyMountTable("setVerifyChecksum", "");   
+      throw readOnlyMountTable("setVerifyChecksum", "");
     }
     }
 
 
     @Override
     @Override

+ 365 - 0
hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/viewfs/TestNestedMountPoint.java

@@ -0,0 +1,365 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.fs.viewfs;
+
+import java.net.URI;
+import java.util.List;
+import java.util.function.Function;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.FsConstants;
+import org.apache.hadoop.fs.Path;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+
+/**
+ * Unit test of nested mount point support in INodeTree
+ */
+public class TestNestedMountPoint {
+  private InodeTree inodeTree;
+  private Configuration conf;
+  private String mtName;
+  private URI fsUri;
+
+  static class TestNestMountPointFileSystem {
+    public URI getUri() {
+      return uri;
+    }
+
+    private URI uri;
+
+    TestNestMountPointFileSystem(URI uri) {
+      this.uri = uri;
+    }
+  }
+
+  static class TestNestMountPointInternalFileSystem extends TestNestMountPointFileSystem {
+    TestNestMountPointInternalFileSystem(URI uri) {
+      super(uri);
+    }
+  }
+
+  private static final URI LINKFALLBACK_TARGET = URI.create("hdfs://nn00");
+  private static final URI NN1_TARGET = URI.create("hdfs://nn01/a/b");
+  private static final URI NN2_TARGET = URI.create("hdfs://nn02/a/b/e");
+  private static final URI NN3_TARGET = URI.create("hdfs://nn03/a/b/c/d");
+  private static final URI NN4_TARGET = URI.create("hdfs://nn04/a/b/c/d/e");
+  private static final URI NN5_TARGET = URI.create("hdfs://nn05/b/c/d/e");
+  private static final URI NN6_TARGET = URI.create("hdfs://nn06/b/c/d/e/f");
+
+  @Before
+  public void setUp() throws Exception {
+    conf = new Configuration();
+    mtName = TestNestedMountPoint.class.getName();
+    ConfigUtil.setIsNestedMountPointSupported(conf, true);
+    ConfigUtil.addLink(conf, mtName, "/a/b", NN1_TARGET);
+    ConfigUtil.addLink(conf, mtName, "/a/b/e", NN2_TARGET);
+    ConfigUtil.addLink(conf, mtName, "/a/b/c/d", NN3_TARGET);
+    ConfigUtil.addLink(conf, mtName, "/a/b/c/d/e", NN4_TARGET);
+    ConfigUtil.addLink(conf, mtName, "/b/c/d/e", NN5_TARGET);
+    ConfigUtil.addLink(conf, mtName, "/b/c/d/e/f", NN6_TARGET);
+    ConfigUtil.addLinkFallback(conf, mtName, LINKFALLBACK_TARGET);
+
+    fsUri = new URI(FsConstants.VIEWFS_SCHEME, mtName, "/", null, null);
+
+    inodeTree = new InodeTree<TestNestedMountPoint.TestNestMountPointFileSystem>(conf,
+        mtName, fsUri, false) {
+      @Override
+      protected Function<URI, TestNestedMountPoint.TestNestMountPointFileSystem> initAndGetTargetFs() {
+        return new Function<URI, TestNestMountPointFileSystem>() {
+          @Override
+          public TestNestedMountPoint.TestNestMountPointFileSystem apply(URI uri) {
+            return new TestNestMountPointFileSystem(uri);
+          }
+        };
+      }
+
+      // For intenral dir fs
+      @Override
+      protected TestNestedMountPoint.TestNestMountPointInternalFileSystem getTargetFileSystem(
+          final INodeDir<TestNestedMountPoint.TestNestMountPointFileSystem> dir) {
+        return new TestNestMountPointInternalFileSystem(fsUri);
+      }
+
+      @Override
+      protected TestNestedMountPoint.TestNestMountPointInternalFileSystem getTargetFileSystem(
+          final String settings, final URI[] mergeFsURIList) {
+        return new TestNestMountPointInternalFileSystem(null);
+      }
+    };
+  }
+
+  @After
+  public void tearDown() throws Exception {
+    inodeTree = null;
+  }
+
+  @Test
+  public void testPathResolveToLink() throws Exception {
+    // /a/b/c/d/e/f resolves to /a/b/c/d/e and /f
+    InodeTree.ResolveResult resolveResult = inodeTree.resolve("/a/b/c/d/e/f", true);
+    Assert.assertEquals(InodeTree.ResultKind.EXTERNAL_DIR, resolveResult.kind);
+    Assert.assertEquals("/a/b/c/d/e", resolveResult.resolvedPath);
+    Assert.assertEquals(new Path("/f"), resolveResult.remainingPath);
+    Assert.assertTrue(resolveResult.targetFileSystem instanceof TestNestMountPointFileSystem);
+    Assert.assertEquals(NN4_TARGET, ((TestNestMountPointFileSystem) resolveResult.targetFileSystem).getUri());
+    Assert.assertTrue(resolveResult.isLastInternalDirLink());
+
+    // /a/b/c/d/e resolves to /a/b/c/d/e and /
+    InodeTree.ResolveResult resolveResult2 = inodeTree.resolve("/a/b/c/d/e", true);
+    Assert.assertEquals(InodeTree.ResultKind.EXTERNAL_DIR, resolveResult2.kind);
+    Assert.assertEquals("/a/b/c/d/e", resolveResult2.resolvedPath);
+    Assert.assertEquals(new Path("/"), resolveResult2.remainingPath);
+    Assert.assertTrue(resolveResult2.targetFileSystem instanceof TestNestMountPointFileSystem);
+    Assert.assertEquals(NN4_TARGET, ((TestNestMountPointFileSystem) resolveResult2.targetFileSystem).getUri());
+    Assert.assertTrue(resolveResult2.isLastInternalDirLink());
+
+    // /a/b/c/d/e/f/g/h/i resolves to /a/b/c/d/e and /f/g/h/i
+    InodeTree.ResolveResult resolveResult3 = inodeTree.resolve("/a/b/c/d/e/f/g/h/i", true);
+    Assert.assertEquals(InodeTree.ResultKind.EXTERNAL_DIR, resolveResult3.kind);
+    Assert.assertEquals("/a/b/c/d/e", resolveResult3.resolvedPath);
+    Assert.assertEquals(new Path("/f/g/h/i"), resolveResult3.remainingPath);
+    Assert.assertTrue(resolveResult3.targetFileSystem instanceof TestNestMountPointFileSystem);
+    Assert.assertEquals(NN4_TARGET, ((TestNestMountPointFileSystem) resolveResult3.targetFileSystem).getUri());
+    Assert.assertTrue(resolveResult3.isLastInternalDirLink());
+  }
+
+  @Test
+  public void testPathResolveToLinkNotResolveLastComponent() throws Exception {
+    // /a/b/c/d/e/f resolves to /a/b/c/d/e and /f
+    InodeTree.ResolveResult resolveResult = inodeTree.resolve("/a/b/c/d/e/f", false);
+    Assert.assertEquals(InodeTree.ResultKind.EXTERNAL_DIR, resolveResult.kind);
+    Assert.assertEquals("/a/b/c/d/e", resolveResult.resolvedPath);
+    Assert.assertEquals(new Path("/f"), resolveResult.remainingPath);
+    Assert.assertTrue(resolveResult.targetFileSystem instanceof TestNestMountPointFileSystem);
+    Assert.assertEquals(NN4_TARGET, ((TestNestMountPointFileSystem) resolveResult.targetFileSystem).getUri());
+    Assert.assertTrue(resolveResult.isLastInternalDirLink());
+
+    // /a/b/c/d/e resolves to /a/b/c/d and /e
+    InodeTree.ResolveResult resolveResult2 = inodeTree.resolve("/a/b/c/d/e", false);
+    Assert.assertEquals(InodeTree.ResultKind.EXTERNAL_DIR, resolveResult2.kind);
+    Assert.assertEquals("/a/b/c/d", resolveResult2.resolvedPath);
+    Assert.assertEquals(new Path("/e"), resolveResult2.remainingPath);
+    Assert.assertTrue(resolveResult2.targetFileSystem instanceof TestNestMountPointFileSystem);
+    Assert.assertEquals(NN3_TARGET, ((TestNestMountPointFileSystem) resolveResult2.targetFileSystem).getUri());
+    Assert.assertTrue(resolveResult2.isLastInternalDirLink());
+
+    // /a/b/c/d/e/f/g/h/i resolves to /a/b/c/d/e and /f/g/h/i
+    InodeTree.ResolveResult resolveResult3 = inodeTree.resolve("/a/b/c/d/e/f/g/h/i", false);
+    Assert.assertEquals(InodeTree.ResultKind.EXTERNAL_DIR, resolveResult3.kind);
+    Assert.assertEquals("/a/b/c/d/e", resolveResult3.resolvedPath);
+    Assert.assertEquals(new Path("/f/g/h/i"), resolveResult3.remainingPath);
+    Assert.assertTrue(resolveResult3.targetFileSystem instanceof TestNestMountPointFileSystem);
+    Assert.assertEquals(NN4_TARGET, ((TestNestMountPointFileSystem) resolveResult3.targetFileSystem).getUri());
+    Assert.assertTrue(resolveResult3.isLastInternalDirLink());
+  }
+
+  @Test
+  public void testPathResolveToDirLink() throws Exception {
+    // /a/b/c/d/f resolves to /a/b/c/d, /f
+    InodeTree.ResolveResult resolveResult = inodeTree.resolve("/a/b/c/d/f", true);
+    Assert.assertEquals(InodeTree.ResultKind.EXTERNAL_DIR, resolveResult.kind);
+    Assert.assertEquals("/a/b/c/d", resolveResult.resolvedPath);
+    Assert.assertEquals(new Path("/f"), resolveResult.remainingPath);
+    Assert.assertTrue(resolveResult.targetFileSystem instanceof TestNestMountPointFileSystem);
+    Assert.assertEquals(NN3_TARGET, ((TestNestMountPointFileSystem) resolveResult.targetFileSystem).getUri());
+    Assert.assertTrue(resolveResult.isLastInternalDirLink());
+
+    // /a/b/c/d resolves to /a/b/c/d and /
+    InodeTree.ResolveResult resolveResult2 = inodeTree.resolve("/a/b/c/d", true);
+    Assert.assertEquals(InodeTree.ResultKind.EXTERNAL_DIR, resolveResult2.kind);
+    Assert.assertEquals("/a/b/c/d", resolveResult2.resolvedPath);
+    Assert.assertEquals(new Path("/"), resolveResult2.remainingPath);
+    Assert.assertTrue(resolveResult2.targetFileSystem instanceof TestNestMountPointFileSystem);
+    Assert.assertEquals(NN3_TARGET, ((TestNestMountPointFileSystem) resolveResult2.targetFileSystem).getUri());
+    Assert.assertTrue(resolveResult2.isLastInternalDirLink());
+
+    // /a/b/c/d/f/g/h/i resolves to /a/b/c/d and /f/g/h/i
+    InodeTree.ResolveResult resolveResult3 = inodeTree.resolve("/a/b/c/d/f/g/h/i", true);
+    Assert.assertEquals(InodeTree.ResultKind.EXTERNAL_DIR, resolveResult3.kind);
+    Assert.assertEquals("/a/b/c/d", resolveResult3.resolvedPath);
+    Assert.assertEquals(new Path("/f/g/h/i"), resolveResult3.remainingPath);
+    Assert.assertTrue(resolveResult3.targetFileSystem instanceof TestNestMountPointFileSystem);
+    Assert.assertEquals(NN3_TARGET, ((TestNestMountPointFileSystem) resolveResult3.targetFileSystem).getUri());
+    Assert.assertTrue(resolveResult3.isLastInternalDirLink());
+  }
+
+  @Test
+  public void testPathResolveToDirLinkNotResolveLastComponent() throws Exception {
+    // /a/b/c/d/f resolves to /a/b/c/d, /f
+    InodeTree.ResolveResult resolveResult = inodeTree.resolve("/a/b/c/d/f", false);
+    Assert.assertEquals(InodeTree.ResultKind.EXTERNAL_DIR, resolveResult.kind);
+    Assert.assertEquals("/a/b/c/d", resolveResult.resolvedPath);
+    Assert.assertEquals(new Path("/f"), resolveResult.remainingPath);
+    Assert.assertTrue(resolveResult.targetFileSystem instanceof TestNestMountPointFileSystem);
+    Assert.assertEquals(NN3_TARGET, ((TestNestMountPointFileSystem) resolveResult.targetFileSystem).getUri());
+    Assert.assertTrue(resolveResult.isLastInternalDirLink());
+
+    // /a/b/c/d resolves to /a/b and /c/d
+    InodeTree.ResolveResult resolveResult2 = inodeTree.resolve("/a/b/c/d", false);
+    Assert.assertEquals(InodeTree.ResultKind.EXTERNAL_DIR, resolveResult2.kind);
+    Assert.assertEquals("/a/b", resolveResult2.resolvedPath);
+    Assert.assertEquals(new Path("/c/d"), resolveResult2.remainingPath);
+    Assert.assertTrue(resolveResult2.targetFileSystem instanceof TestNestMountPointFileSystem);
+    Assert.assertEquals(NN1_TARGET, ((TestNestMountPointFileSystem) resolveResult2.targetFileSystem).getUri());
+    Assert.assertTrue(resolveResult2.isLastInternalDirLink());
+
+    // /a/b/c/d/f/g/h/i resolves to /a/b/c/d and /f/g/h/i
+    InodeTree.ResolveResult resolveResult3 = inodeTree.resolve("/a/b/c/d/f/g/h/i", false);
+    Assert.assertEquals(InodeTree.ResultKind.EXTERNAL_DIR, resolveResult3.kind);
+    Assert.assertEquals("/a/b/c/d", resolveResult3.resolvedPath);
+    Assert.assertEquals(new Path("/f/g/h/i"), resolveResult3.remainingPath);
+    Assert.assertTrue(resolveResult3.targetFileSystem instanceof TestNestMountPointFileSystem);
+    Assert.assertEquals(NN3_TARGET, ((TestNestMountPointFileSystem) resolveResult3.targetFileSystem).getUri());
+    Assert.assertTrue(resolveResult3.isLastInternalDirLink());
+  }
+
+  @Test
+  public void testMultiNestedMountPointsPathResolveToDirLink() throws Exception {
+    // /a/b/f resolves to /a/b and /f
+    InodeTree.ResolveResult resolveResult = inodeTree.resolve("/a/b/f", true);
+    Assert.assertEquals(InodeTree.ResultKind.EXTERNAL_DIR, resolveResult.kind);
+    Assert.assertEquals("/a/b", resolveResult.resolvedPath);
+    Assert.assertEquals(new Path("/f"), resolveResult.remainingPath);
+    Assert.assertTrue(resolveResult.targetFileSystem instanceof TestNestMountPointFileSystem);
+    Assert.assertEquals(NN1_TARGET, ((TestNestMountPointFileSystem) resolveResult.targetFileSystem).getUri());
+    Assert.assertTrue(resolveResult.isLastInternalDirLink());
+
+    // /a/b resolves to /a/b and /
+    InodeTree.ResolveResult resolveResult2 = inodeTree.resolve("/a/b", true);
+    Assert.assertEquals(InodeTree.ResultKind.EXTERNAL_DIR, resolveResult2.kind);
+    Assert.assertEquals("/a/b", resolveResult2.resolvedPath);
+    Assert.assertEquals(new Path("/"), resolveResult2.remainingPath);
+    Assert.assertTrue(resolveResult2.targetFileSystem instanceof TestNestMountPointFileSystem);
+    Assert.assertEquals(NN1_TARGET, ((TestNestMountPointFileSystem) resolveResult2.targetFileSystem).getUri());
+    Assert.assertTrue(resolveResult2.isLastInternalDirLink());
+  }
+
+  @Test
+  public void testMultiNestedMountPointsPathResolveToDirLinkNotResolveLastComponent() throws Exception {
+    // /a/b/f resolves to /a/b and /f
+    InodeTree.ResolveResult resolveResult = inodeTree.resolve("/a/b/f", false);
+    Assert.assertEquals(InodeTree.ResultKind.EXTERNAL_DIR, resolveResult.kind);
+    Assert.assertEquals("/a/b", resolveResult.resolvedPath);
+    Assert.assertEquals(new Path("/f"), resolveResult.remainingPath);
+    Assert.assertTrue(resolveResult.targetFileSystem instanceof TestNestMountPointFileSystem);
+    Assert.assertEquals(NN1_TARGET, ((TestNestMountPointFileSystem) resolveResult.targetFileSystem).getUri());
+    Assert.assertTrue(resolveResult.isLastInternalDirLink());
+
+    // /a/b resolves to /a and /b
+    InodeTree.ResolveResult resolveResult2 = inodeTree.resolve("/a/b", false);
+    Assert.assertEquals(InodeTree.ResultKind.INTERNAL_DIR, resolveResult2.kind);
+    Assert.assertEquals("/a", resolveResult2.resolvedPath);
+    Assert.assertEquals(new Path("/b"), resolveResult2.remainingPath);
+    Assert.assertTrue(resolveResult2.targetFileSystem instanceof TestNestMountPointInternalFileSystem);
+    Assert.assertEquals(fsUri, ((TestNestMountPointInternalFileSystem) resolveResult2.targetFileSystem).getUri());
+    Assert.assertFalse(resolveResult2.isLastInternalDirLink());
+  }
+
+  @Test
+  public void testPathResolveToDirLinkLastComponentInternalDir() throws Exception {
+    // /a/b/c resolves to /a/b and /c
+    InodeTree.ResolveResult resolveResult = inodeTree.resolve("/a/b/c", true);
+    Assert.assertEquals(InodeTree.ResultKind.EXTERNAL_DIR, resolveResult.kind);
+    Assert.assertEquals("/a/b", resolveResult.resolvedPath);
+    Assert.assertEquals(new Path("/c"), resolveResult.remainingPath);
+    Assert.assertTrue(resolveResult.targetFileSystem instanceof TestNestMountPointFileSystem);
+    Assert.assertEquals(NN1_TARGET, ((TestNestMountPointFileSystem) resolveResult.targetFileSystem).getUri());
+    Assert.assertTrue(resolveResult.isLastInternalDirLink());
+  }
+
+  @Test
+  public void testPathResolveToDirLinkLastComponentInternalDirNotResolveLastComponent() throws Exception {
+    // /a/b/c resolves to /a/b and /c
+    InodeTree.ResolveResult resolveResult = inodeTree.resolve("/a/b/c", false);
+    Assert.assertEquals(InodeTree.ResultKind.EXTERNAL_DIR, resolveResult.kind);
+    Assert.assertEquals("/a/b", resolveResult.resolvedPath);
+    Assert.assertEquals(new Path("/c"), resolveResult.remainingPath);
+    Assert.assertTrue(resolveResult.targetFileSystem instanceof TestNestMountPointFileSystem);
+    Assert.assertEquals(NN1_TARGET, ((TestNestMountPointFileSystem) resolveResult.targetFileSystem).getUri());
+    Assert.assertTrue(resolveResult.isLastInternalDirLink());
+  }
+
+  @Test
+  public void testPathResolveToLinkFallBack() throws Exception {
+    // /a/e resolves to linkfallback
+    InodeTree.ResolveResult resolveResult = inodeTree.resolve("/a/e", true);
+    Assert.assertEquals(InodeTree.ResultKind.EXTERNAL_DIR, resolveResult.kind);
+    Assert.assertEquals("/", resolveResult.resolvedPath);
+    Assert.assertEquals(new Path("/a/e"), resolveResult.remainingPath);
+    Assert.assertTrue(resolveResult.targetFileSystem instanceof TestNestMountPointFileSystem);
+    Assert.assertEquals(LINKFALLBACK_TARGET, ((TestNestMountPointFileSystem) resolveResult.targetFileSystem).getUri());
+    Assert.assertFalse(resolveResult.isLastInternalDirLink());
+  }
+
+  @Test
+  public void testPathNotResolveToLinkFallBackNotResolveLastComponent() throws Exception {
+    // /a/e resolves to internalDir instead of linkfallback
+    InodeTree.ResolveResult resolveResult = inodeTree.resolve("/a/e", false);
+    Assert.assertEquals(InodeTree.ResultKind.INTERNAL_DIR, resolveResult.kind);
+    Assert.assertEquals("/a", resolveResult.resolvedPath);
+    Assert.assertEquals(new Path("/e"), resolveResult.remainingPath);
+    Assert.assertTrue(resolveResult.targetFileSystem instanceof TestNestMountPointInternalFileSystem);
+    Assert.assertEquals(fsUri, ((TestNestMountPointInternalFileSystem) resolveResult.targetFileSystem).getUri());
+    Assert.assertFalse(resolveResult.isLastInternalDirLink());
+  }
+
+  @Test
+  public void testPathResolveToInternalDir() throws Exception {
+    // /b/c resolves to internal dir
+    InodeTree.ResolveResult resolveResult = inodeTree.resolve("/b/c", true);
+    Assert.assertEquals(InodeTree.ResultKind.INTERNAL_DIR, resolveResult.kind);
+    Assert.assertEquals("/b/c", resolveResult.resolvedPath);
+    Assert.assertEquals(new Path("/"), resolveResult.remainingPath);
+    Assert.assertTrue(resolveResult.targetFileSystem instanceof TestNestMountPointInternalFileSystem);
+    Assert.assertEquals(fsUri, ((TestNestMountPointInternalFileSystem) resolveResult.targetFileSystem).getUri());
+    Assert.assertFalse(resolveResult.isLastInternalDirLink());
+  }
+
+  @Test
+  public void testPathResolveToInternalDirNotResolveLastComponent() throws Exception {
+    // /b/c resolves to internal dir
+    InodeTree.ResolveResult resolveResult = inodeTree.resolve("/b/c", false);
+    Assert.assertEquals(InodeTree.ResultKind.INTERNAL_DIR, resolveResult.kind);
+    Assert.assertEquals("/b", resolveResult.resolvedPath);
+    Assert.assertEquals(new Path("/c"), resolveResult.remainingPath);
+    Assert.assertTrue(resolveResult.targetFileSystem instanceof TestNestMountPointInternalFileSystem);
+    Assert.assertEquals(fsUri, ((TestNestMountPointInternalFileSystem) resolveResult.targetFileSystem).getUri());
+    Assert.assertFalse(resolveResult.isLastInternalDirLink());
+  }
+
+  @Test
+  public void testSlashResolveToInternalDir() throws Exception {
+    // / resolves to internal dir
+    InodeTree.ResolveResult resolveResult = inodeTree.resolve("/", true);
+    Assert.assertEquals(InodeTree.ResultKind.INTERNAL_DIR, resolveResult.kind);
+    Assert.assertEquals("/", resolveResult.resolvedPath);
+    Assert.assertEquals(new Path("/"), resolveResult.remainingPath);
+    Assert.assertTrue(resolveResult.targetFileSystem instanceof TestNestMountPointInternalFileSystem);
+    Assert.assertFalse(resolveResult.isLastInternalDirLink());
+  }
+
+  @Test
+  public void testInodeTreeMountPoints() throws Exception {
+    List<InodeTree.MountPoint<FileSystem>> mountPoints = inodeTree.getMountPoints();
+    Assert.assertEquals(6, mountPoints.size());
+  }
+}

+ 1 - 0
hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/viewfs/TestViewFsConfig.java

@@ -32,6 +32,7 @@ public class TestViewFsConfig {
   @Test(expected = FileAlreadyExistsException.class)
   @Test(expected = FileAlreadyExistsException.class)
   public void testInvalidConfig() throws IOException, URISyntaxException {
   public void testInvalidConfig() throws IOException, URISyntaxException {
     Configuration conf = new Configuration();
     Configuration conf = new Configuration();
+    ConfigUtil.setIsNestedMountPointSupported(conf, false);
     ConfigUtil.addLink(conf, "/internalDir/linkToDir2",
     ConfigUtil.addLink(conf, "/internalDir/linkToDir2",
         new Path("file:///dir2").toUri());
         new Path("file:///dir2").toUri());
     ConfigUtil.addLink(conf, "/internalDir/linkToDir2/linkToDir3",
     ConfigUtil.addLink(conf, "/internalDir/linkToDir2/linkToDir3",

+ 142 - 0
hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/viewfs/ViewFileSystemBaseTest.java

@@ -460,6 +460,148 @@ abstract public class ViewFileSystemBaseTest {
         .assertIsFile(fsTarget, new Path(targetTestRoot, "data/fooBar"));
         .assertIsFile(fsTarget, new Path(targetTestRoot, "data/fooBar"));
   }
   }
 
 
+
+  // rename across nested mount points that point to same target also fail
+  @Test
+  public void testRenameAcrossNestedMountPointSameTarget() throws IOException {
+    setUpNestedMountPoint();
+    fileSystemTestHelper.createFile(fsView, "/user/foo");
+    try {
+      // Nested mount points point to the same target should fail
+      // /user -> /user
+      // /user/userA -> /user
+      // Rename strategy: SAME_MOUNTPOINT
+      fsView.rename(new Path("/user/foo"), new Path("/user/userA/foo"));
+      ContractTestUtils.fail("IOException is not thrown on rename operation");
+    } catch (IOException e) {
+      GenericTestUtils
+          .assertExceptionContains("Renames across Mount points not supported",
+              e);
+    }
+  }
+
+
+  // rename across nested mount points fail if the mount link targets are different
+  // even if the targets are part of the same target FS
+  @Test
+  public void testRenameAcrossMountPointDifferentTarget() throws IOException {
+    setUpNestedMountPoint();
+    fileSystemTestHelper.createFile(fsView, "/data/foo");
+    // /data -> /data
+    // /data/dataA -> /dataA
+    // Rename strategy: SAME_MOUNTPOINT
+    try {
+      fsView.rename(new Path("/data/foo"), new Path("/data/dataA/fooBar"));
+      ContractTestUtils.fail("IOException is not thrown on rename operation");
+    } catch (IOException e) {
+      GenericTestUtils
+          .assertExceptionContains("Renames across Mount points not supported",
+              e);
+    }
+  }
+
+  // RenameStrategy SAME_TARGET_URI_ACROSS_MOUNTPOINT enabled
+  // to rename across nested mount points that point to same target URI
+  @Test
+  public void testRenameAcrossNestedMountPointSameTargetUriAcrossMountPoint() throws IOException {
+    setUpNestedMountPoint();
+    //  /user/foo -> /user
+    // /user/userA/fooBarBar -> /user
+    // Rename strategy: SAME_TARGET_URI_ACROSS_MOUNTPOINT
+    Configuration conf2 = new Configuration(conf);
+    conf2.set(Constants.CONFIG_VIEWFS_RENAME_STRATEGY,
+        ViewFileSystem.RenameStrategy.SAME_TARGET_URI_ACROSS_MOUNTPOINT
+            .toString());
+    FileSystem fsView2 = FileSystem.newInstance(FsConstants.VIEWFS_URI, conf2);
+    fileSystemTestHelper.createFile(fsView2, "/user/foo");
+    fsView2.rename(new Path("/user/foo"), new Path("/user/userA/fooBarBar"));
+    ContractTestUtils.assertPathDoesNotExist(fsView2, "src should not exist after rename",
+        new Path("/user/foo"));
+    ContractTestUtils.assertPathDoesNotExist(fsTarget, "src should not exist after rename",
+        new Path(targetTestRoot, "user/foo"));
+    ContractTestUtils.assertIsFile(fsView2, fileSystemTestHelper.getTestRootPath(fsView2, "/user/userA/fooBarBar"));
+    ContractTestUtils.assertIsFile(fsTarget, new Path(targetTestRoot, "user/fooBarBar"));
+  }
+
+  // RenameStrategy SAME_FILESYSTEM_ACROSS_MOUNTPOINT enabled
+  // to rename across mount points where the mount link targets are different
+  // but are part of the same target FS
+  @Test
+  public void testRenameAcrossNestedMountPointSameFileSystemAcrossMountPoint() throws IOException {
+    setUpNestedMountPoint();
+    // /data/foo -> /data
+    // /data/dataA/fooBar -> /dataA
+    // Rename strategy: SAME_FILESYSTEM_ACROSS_MOUNTPOINT
+    Configuration conf2 = new Configuration(conf);
+    conf2.set(Constants.CONFIG_VIEWFS_RENAME_STRATEGY,
+        ViewFileSystem.RenameStrategy.SAME_FILESYSTEM_ACROSS_MOUNTPOINT
+            .toString());
+    FileSystem fsView2 = FileSystem.newInstance(FsConstants.VIEWFS_URI, conf2);
+    fileSystemTestHelper.createFile(fsView2, "/data/foo");
+    fsView2.rename(new Path("/data/foo"), new Path("/data/dataB/fooBar"));
+    ContractTestUtils
+        .assertPathDoesNotExist(fsView2, "src should not exist after rename",
+            new Path("/data/foo"));
+    ContractTestUtils
+        .assertPathDoesNotExist(fsTarget, "src should not exist after rename",
+            new Path(targetTestRoot, "data/foo"));
+    ContractTestUtils.assertIsFile(fsView2,
+        fileSystemTestHelper.getTestRootPath(fsView2, "/user/fooBar"));
+    ContractTestUtils
+        .assertIsFile(fsTarget, new Path(targetTestRoot, "user/fooBar"));
+  }
+
+  @Test
+  public void testOperationsThroughNestedMountPointsInternal()
+      throws IOException {
+    setUpNestedMountPoint();
+    // Create file with nested mount point
+    fileSystemTestHelper.createFile(fsView, "/user/userB/foo");
+    Assert.assertTrue("Created file should be type file",
+        fsView.getFileStatus(new Path("/user/userB/foo")).isFile());
+    Assert.assertTrue("Target of created file should be type file",
+        fsTarget.getFileStatus(new Path(targetTestRoot,"userB/foo")).isFile());
+
+    // Delete the created file with nested mount point
+    Assert.assertTrue("Delete should succeed",
+        fsView.delete(new Path("/user/userB/foo"), false));
+    Assert.assertFalse("File should not exist after delete",
+        fsView.exists(new Path("/user/userB/foo")));
+    Assert.assertFalse("Target File should not exist after delete",
+        fsTarget.exists(new Path(targetTestRoot,"userB/foo")));
+
+    // Create file with a 2 component dirs with nested mount point
+    fileSystemTestHelper.createFile(fsView, "/internalDir/linkToDir2/linkToDir2/foo");
+    Assert.assertTrue("Created file should be type file",
+        fsView.getFileStatus(new Path("/internalDir/linkToDir2/linkToDir2/foo")).isFile());
+    Assert.assertTrue("Target of created file should be type file",
+        fsTarget.getFileStatus(new Path(targetTestRoot,"linkToDir2/foo")).isFile());
+
+    // Delete the created file with nested mount point
+    Assert.assertTrue("Delete should succeed",
+        fsView.delete(new Path("/internalDir/linkToDir2/linkToDir2/foo"), false));
+    Assert.assertFalse("File should not exist after delete",
+        fsView.exists(new Path("/internalDir/linkToDir2/linkToDir2/foo")));
+    Assert.assertFalse("Target File should not exist after delete",
+        fsTarget.exists(new Path(targetTestRoot,"linkToDir2/foo")));
+  }
+
+  private void setUpNestedMountPoint() throws IOException {
+    // Enable nested mount point, ViewFilesystem should support both non-nested and nested mount points
+    ConfigUtil.setIsNestedMountPointSupported(conf, true);
+    ConfigUtil.addLink(conf, "/user/userA",
+        new Path(targetTestRoot, "user").toUri());
+    ConfigUtil.addLink(conf, "/user/userB",
+        new Path(targetTestRoot, "userB").toUri());
+    ConfigUtil.addLink(conf, "/data/dataA",
+        new Path(targetTestRoot, "dataA").toUri());
+    ConfigUtil.addLink(conf, "/data/dataB",
+        new Path(targetTestRoot, "user").toUri());
+    ConfigUtil.addLink(conf, "/internalDir/linkToDir2/linkToDir2",
+        new Path(targetTestRoot,"linkToDir2").toUri());
+    fsView = FileSystem.get(FsConstants.VIEWFS_URI, conf);
+  }
+
   static protected boolean SupportsBlocks = false; //  local fs use 1 block
   static protected boolean SupportsBlocks = false; //  local fs use 1 block
                                                    // override for HDFS
                                                    // override for HDFS
   @Test
   @Test