Jelajahi Sumber

HDFS-15533: Provide DFS API compatible class, but use ViewFileSystemOverloadScheme inside. (#2229). Contributed by Uma Maheswara Rao G.

(cherry picked from commit dd013f2fdf1ecbeb6c877e26951cd0d8922058b0)
Uma Maheswara Rao G 5 tahun lalu
induk
melakukan
ba0eca6a2c
15 mengubah file dengan 2794 tambahan dan 89 penghapusan
  1. 0 1
      hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FsConstants.java
  2. 4 3
      hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/viewfs/InodeTree.java
  3. 6 5
      hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/viewfs/ViewFileSystem.java
  4. 71 4
      hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/viewfs/ViewFileSystemOverloadScheme.java
  5. 11 3
      hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DistributedFileSystem.java
  6. 2307 0
      hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/ViewDistributedFileSystem.java
  7. 0 6
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNode.java
  8. 2 2
      hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/fs/viewfs/TestViewFSOverloadSchemeWithMountTableConfigInHDFS.java
  9. 94 48
      hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/fs/viewfs/TestViewFileSystemOverloadSchemeWithHdfsScheme.java
  10. 14 14
      hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDistributedFileSystem.java
  11. 47 0
      hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestViewDistributedFileSystem.java
  12. 94 0
      hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestViewDistributedFileSystemContract.java
  13. 64 0
      hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestViewDistributedFileSystemWithMountLinks.java
  14. 24 3
      hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestCacheDirectives.java
  15. 56 0
      hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestCacheDirectivesWithViewDFS.java

+ 0 - 1
hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FsConstants.java

@@ -45,5 +45,4 @@ public interface FsConstants {
   String FS_VIEWFS_OVERLOAD_SCHEME_TARGET_FS_IMPL_PATTERN =
       "fs.viewfs.overload.scheme.target.%s.impl";
   String VIEWFS_TYPE = "viewfs";
-  String VIEWFSOS_TYPE = "viewfsOverloadScheme";
 }

+ 4 - 3
hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/viewfs/InodeTree.java

@@ -599,9 +599,10 @@ abstract class InodeTree<T> {
 
     if (!gotMountTableEntry) {
       if (!initingUriAsFallbackOnNoMounts) {
-        throw new IOException(
-            "ViewFs: Cannot initialize: Empty Mount table in config for "
-                + "viewfs://" + mountTableName + "/");
+        throw new IOException(new StringBuilder(
+            "ViewFs: Cannot initialize: Empty Mount table in config for ")
+            .append(theUri.getScheme()).append("://").append(mountTableName)
+            .append("/").toString());
       }
       StringBuilder msg =
           new StringBuilder("Empty mount table detected for ").append(theUri)

+ 6 - 5
hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/viewfs/ViewFileSystem.java

@@ -259,13 +259,14 @@ public class ViewFileSystem extends FileSystem {
   }
 
   /**
-   * Returns the ViewFileSystem type.
-   * @return <code>viewfs</code>
+   * Returns false as it does not support to add fallback link automatically on
+   * no mounts.
    */
-  String getType() {
-    return FsConstants.VIEWFS_TYPE;
+  boolean supportAutoAddingFallbackOnNoMounts() {
+    return false;
   }
 
+
   /**
    * Called after a new FileSystem instance is constructed.
    * @param theUri a uri whose authority section names the host, port, etc. for
@@ -293,7 +294,7 @@ public class ViewFileSystem extends FileSystem {
     try {
       myUri = new URI(getScheme(), authority, "/", null, null);
       boolean initingUriAsFallbackOnNoMounts =
-          !FsConstants.VIEWFS_TYPE.equals(getType());
+          supportAutoAddingFallbackOnNoMounts();
       fsState = new InodeTree<FileSystem>(conf, tableName, myUri,
           initingUriAsFallbackOnNoMounts) {
         @Override

+ 71 - 4
hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/viewfs/ViewFileSystemOverloadScheme.java

@@ -104,6 +104,7 @@ import static org.apache.hadoop.fs.viewfs.Constants.CONFIG_VIEWFS_IGNORE_PORT_IN
 @InterfaceStability.Evolving
 public class ViewFileSystemOverloadScheme extends ViewFileSystem {
   private URI myUri;
+  private boolean supportAutoAddingFallbackOnNoMounts = true;
   public ViewFileSystemOverloadScheme() throws IOException {
     super();
   }
@@ -114,11 +115,19 @@ public class ViewFileSystemOverloadScheme extends ViewFileSystem {
   }
 
   /**
-   * Returns the ViewFileSystem type.
-   * @return <code>viewfs</code>
+   * By default returns false as ViewFileSystemOverloadScheme supports auto
+   * adding fallback on no mounts.
    */
-  String getType() {
-    return FsConstants.VIEWFSOS_TYPE;
+  public boolean supportAutoAddingFallbackOnNoMounts() {
+    return this.supportAutoAddingFallbackOnNoMounts;
+  }
+
+  /**
+   * Sets whether to add fallback automatically when no mount points found.
+   */
+  public void setSupportAutoAddingFallbackOnNoMounts(
+      boolean addAutoFallbackOnNoMounts) {
+    this.supportAutoAddingFallbackOnNoMounts = addAutoFallbackOnNoMounts;
   }
 
   @Override
@@ -287,4 +296,62 @@ public class ViewFileSystemOverloadScheme extends ViewFileSystem {
     }
   }
 
+  /**
+   * Gets the mount path info, which contains the target file system and
+   * remaining path to pass to the target file system.
+   */
+  public MountPathInfo<FileSystem> getMountPathInfo(Path path,
+      Configuration conf) throws IOException {
+    InodeTree.ResolveResult<FileSystem> res;
+    try {
+      res = fsState.resolve(getUriPath(path), true);
+      FileSystem fs = res.isInternalDir() ?
+          (fsState.getRootFallbackLink() != null ?
+              ((ChRootedFileSystem) fsState
+                  .getRootFallbackLink().targetFileSystem).getMyFs() :
+              fsGetter().get(path.toUri(), conf)) :
+          ((ChRootedFileSystem) res.targetFileSystem).getMyFs();
+      return new MountPathInfo<FileSystem>(res.remainingPath, res.resolvedPath,
+          fs);
+    } catch (FileNotFoundException e) {
+      // No link configured with passed path.
+      throw new NotInMountpointException(path,
+          "No link found for the given path.");
+    }
+  }
+
+  /**
+   * A class to maintain the target file system and a path to pass to the target
+   * file system.
+   */
+  public static class MountPathInfo<T> {
+    private Path pathOnTarget;
+    private T targetFs;
+
+    public MountPathInfo(Path pathOnTarget, String resolvedPath, T targetFs) {
+      this.pathOnTarget = pathOnTarget;
+      this.targetFs = targetFs;
+    }
+
+    public Path getPathOnTarget() {
+      return this.pathOnTarget;
+    }
+
+    public T getTargetFs() {
+      return this.targetFs;
+    }
+  }
+
+  /**
+   * @return Gets the fallback file system configured. Usually, this will be the
+   * default cluster.
+   */
+  public FileSystem getFallbackFileSystem() {
+    if (fsState.getRootFallbackLink() == null) {
+      return null;
+    }
+    return ((ChRootedFileSystem) fsState.getRootFallbackLink().targetFileSystem)
+        .getMyFs();
+  }
+
 }

+ 11 - 3
hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DistributedFileSystem.java

@@ -183,7 +183,7 @@ public class DistributedFileSystem extends FileSystem
       throw new IOException("Incomplete HDFS URI, no host: "+ uri);
     }
 
-    this.dfs = new DFSClient(uri, conf, statistics);
+    initDFSClient(uri, conf);
     this.uri = URI.create(uri.getScheme()+"://"+uri.getAuthority());
     this.workingDir = getHomeDirectory();
 
@@ -197,6 +197,10 @@ public class DistributedFileSystem extends FileSystem
           });
   }
 
+  void initDFSClient(URI theUri, Configuration conf) throws IOException {
+    this.dfs =  new DFSClient(theUri, conf, statistics);
+  }
+
   @Override
   public Path getWorkingDirectory() {
     return workingDir;
@@ -1507,10 +1511,14 @@ public class DistributedFileSystem extends FileSystem
   @Override
   public void close() throws IOException {
     try {
-      dfs.closeOutputStreams(false);
+      if (dfs != null) {
+        dfs.closeOutputStreams(false);
+      }
       super.close();
     } finally {
-      dfs.close();
+      if (dfs != null) {
+        dfs.close();
+      }
     }
   }
 

+ 2307 - 0
hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/ViewDistributedFileSystem.java

@@ -0,0 +1,2307 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs;
+
+import org.apache.hadoop.HadoopIllegalArgumentException;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.crypto.key.KeyProvider;
+import org.apache.hadoop.fs.BlockLocation;
+import org.apache.hadoop.fs.BlockStoragePolicySpi;
+import org.apache.hadoop.fs.CacheFlag;
+import org.apache.hadoop.fs.ContentSummary;
+import org.apache.hadoop.fs.CreateFlag;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileChecksum;
+import org.apache.hadoop.fs.FileEncryptionInfo;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.FsServerDefaults;
+import org.apache.hadoop.fs.FsStatus;
+import org.apache.hadoop.fs.LocatedFileStatus;
+import org.apache.hadoop.fs.Options;
+import org.apache.hadoop.fs.PartialListing;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.PathFilter;
+import org.apache.hadoop.fs.PathHandle;
+import org.apache.hadoop.fs.QuotaUsage;
+import org.apache.hadoop.fs.RemoteIterator;
+import org.apache.hadoop.fs.StorageType;
+import org.apache.hadoop.fs.XAttrSetFlag;
+import org.apache.hadoop.fs.permission.AclEntry;
+import org.apache.hadoop.fs.permission.AclStatus;
+import org.apache.hadoop.fs.permission.FsAction;
+import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.fs.viewfs.ViewFileSystem;
+import org.apache.hadoop.fs.viewfs.ViewFileSystemOverloadScheme;
+import org.apache.hadoop.hdfs.client.HdfsDataOutputStream;
+import org.apache.hadoop.hdfs.protocol.AddErasureCodingPolicyResponse;
+import org.apache.hadoop.hdfs.protocol.BlockStoragePolicy;
+import org.apache.hadoop.hdfs.protocol.CacheDirectiveEntry;
+import org.apache.hadoop.hdfs.protocol.CacheDirectiveInfo;
+import org.apache.hadoop.hdfs.protocol.CachePoolEntry;
+import org.apache.hadoop.hdfs.protocol.CachePoolInfo;
+import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
+import org.apache.hadoop.hdfs.protocol.ECTopologyVerifierResult;
+import org.apache.hadoop.hdfs.protocol.EncryptionZone;
+import org.apache.hadoop.hdfs.protocol.ErasureCodingPolicy;
+import org.apache.hadoop.hdfs.protocol.ErasureCodingPolicyInfo;
+import org.apache.hadoop.hdfs.protocol.HdfsConstants;
+import org.apache.hadoop.hdfs.protocol.HdfsPathHandle;
+import org.apache.hadoop.hdfs.protocol.OpenFileEntry;
+import org.apache.hadoop.hdfs.protocol.OpenFilesIterator;
+import org.apache.hadoop.hdfs.protocol.RollingUpgradeInfo;
+import org.apache.hadoop.hdfs.protocol.SnapshotDiffReport;
+import org.apache.hadoop.hdfs.protocol.SnapshotDiffReportListing;
+import org.apache.hadoop.hdfs.protocol.SnapshottableDirectoryStatus;
+import org.apache.hadoop.hdfs.protocol.ZoneReencryptionStatus;
+import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier;
+import org.apache.hadoop.io.MultipleIOException;
+import org.apache.hadoop.security.AccessControlException;
+import org.apache.hadoop.security.token.DelegationTokenIssuer;
+import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.util.Progressable;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.FileNotFoundException;
+import java.io.IOException;
+
+import java.net.InetSocketAddress;
+import java.net.URI;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.EnumSet;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.NoSuchElementException;
+
+/**
+ * The ViewDistributedFileSystem is an extended class to DistributedFileSystem
+ * with additional mounting functionality. The goal is to have better API
+ * compatibility for HDFS users when using mounting
+ * filesystem(ViewFileSystemOverloadScheme).
+ * The ViewFileSystemOverloadScheme{@link ViewFileSystemOverloadScheme} is a new
+ * filesystem with inherited mounting functionality from ViewFileSystem.
+ * For the user who is using ViewFileSystemOverloadScheme by setting
+ * fs.hdfs.impl=org.apache.hadoop.fs.viewfs.ViewFileSystemOverloadScheme, now
+ * they can set fs.hdfs.impl=org.apache.hadoop.hdfs.ViewDistributedFileSystem.
+ * So, that the hdfs users will get closely compatible API with mount
+ * functionality. For the rest of all other schemes can continue to use
+ * ViewFileSystemOverloadScheme class directly for mount functionality. Please
+ * note that ViewFileSystemOverloadScheme provides only
+ * ViewFileSystem{@link ViewFileSystem} APIs.
+ * If user configured this class but no mount point configured? Then it will
+ * simply work as existing DistributedFileSystem class. If user configured both
+ * fs.hdfs.impl to this class and mount configurations, then users will be able
+ * to make calls the APIs available in this class, they are nothing but DFS
+ * APIs, but they will be delegated to viewfs functionality. Please note, APIs
+ * without any path in arguments( ex: isInSafeMode), will be delegated to
+ * default filesystem only, that is the configured fallback link. If you want to
+ * make these API calls on specific child filesystem, you may want to initialize
+ * them separately and call. In ViewDistributedFileSystem, we strongly recommend
+ * to configure linkFallBack when you add mount links and it's recommended to
+ * point be to your base cluster, usually your current fs.defaultFS if that's
+ * pointing to hdfs.
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Unstable
+public class ViewDistributedFileSystem extends DistributedFileSystem {
+  private static final Logger LOGGER =
+      LoggerFactory.getLogger(ViewDistributedFileSystem.class);
+
+  // A mounting file system.
+  private ViewFileSystemOverloadScheme vfs;
+  // A default DFS, which should have set via linkFallback
+  private DistributedFileSystem defaultDFS;
+
+  @Override
+  public void initialize(URI uri, Configuration conf) throws IOException {
+    super.initialize(uri, conf);
+    try {
+      this.vfs = tryInitializeMountingViewFs(uri, conf);
+    } catch (IOException ioe) {
+      LOGGER.debug(new StringBuilder("Mount tree initialization failed with ")
+          .append("the reason => {}. Falling back to regular DFS")
+          .append(" initialization. Please re-initialize the fs after updating")
+          .append(" mount point.").toString(), ioe.getMessage());
+      // Previous super.initialize would have skipped the dfsclient init and
+      // setWorkingDirectory as we planned to initialize vfs. Since vfs init
+      // failed, let's init dfsClient now.
+      super.initDFSClient(uri, conf);
+      super.setWorkingDirectory(super.getHomeDirectory());
+      return;
+    }
+
+    setConf(conf);
+    // A child DFS with the current initialized URI. This must be same as
+    // fallback fs. The fallback must point to root of your filesystems.
+    // Some APIs(without path in argument, for example isInSafeMode) will
+    // support only for base cluster filesystem. Only that APIs will use this
+    // fs.
+    defaultDFS = (DistributedFileSystem) this.vfs.getFallbackFileSystem();
+    // Please don't access internal dfs client directly except in tests.
+    dfs = (defaultDFS != null) ? defaultDFS.dfs : null;
+    super.setWorkingDirectory(this.vfs.getHomeDirectory());
+  }
+
+  @Override
+  void initDFSClient(URI uri, Configuration conf) throws IOException {
+    // Since we plan to initialize vfs in this class, we will not need to
+    // initialize DFS client.
+  }
+
+  public ViewDistributedFileSystem() {
+  }
+
+  private ViewFileSystemOverloadScheme tryInitializeMountingViewFs(URI theUri,
+      Configuration conf) throws IOException {
+    ViewFileSystemOverloadScheme viewFs = new ViewFileSystemOverloadScheme();
+    viewFs.setSupportAutoAddingFallbackOnNoMounts(false);
+    viewFs.initialize(theUri, conf);
+    return viewFs;
+  }
+
+  @Override
+  public URI getUri() {
+    if (this.vfs == null) {
+      return super.getUri();
+    }
+    return this.vfs.getUri();
+  }
+
+  @Override
+  public String getScheme() {
+    if (this.vfs == null) {
+      return super.getScheme();
+    }
+    return this.vfs.getScheme();
+  }
+
+  @Override
+  public Path getWorkingDirectory() {
+    if (this.vfs == null) {
+      return super.getWorkingDirectory();
+    }
+    return this.vfs.getWorkingDirectory();
+  }
+
+  @Override
+  public void setWorkingDirectory(Path dir) {
+    if (this.vfs == null) {
+      super.setWorkingDirectory(dir);
+      return;
+    }
+    this.vfs.setWorkingDirectory(dir);
+  }
+
+  @Override
+  public Path getHomeDirectory() {
+    if (super.dfs == null) {
+      return null;
+    }
+    if (this.vfs == null) {
+      return super.getHomeDirectory();
+    }
+    return this.vfs.getHomeDirectory();
+  }
+
+  /**
+   * Returns only default cluster getHedgedReadMetrics.
+   */
+  @Override
+  public DFSHedgedReadMetrics getHedgedReadMetrics() {
+    if (this.vfs == null) {
+      return super.getHedgedReadMetrics();
+    }
+    checkDefaultDFS(defaultDFS, "getHedgedReadMetrics");
+    return defaultDFS.getHedgedReadMetrics();
+  }
+
+  @Override
+  public BlockLocation[] getFileBlockLocations(FileStatus fs, long start,
+      long len) throws IOException {
+    if (this.vfs == null) {
+      return super.getFileBlockLocations(fs, start, len);
+    }
+    return this.vfs.getFileBlockLocations(fs, start, len);
+  }
+
+  @Override
+  public BlockLocation[] getFileBlockLocations(Path p, final long start,
+      final long len) throws IOException {
+    if (this.vfs == null) {
+      return super.getFileBlockLocations(p, start, len);
+    }
+    return this.vfs.getFileBlockLocations(p, start, len);
+  }
+
+  @Override
+  public void setVerifyChecksum(final boolean verifyChecksum) {
+    if (this.vfs == null) {
+      super.setVerifyChecksum(verifyChecksum);
+      return;
+    }
+    this.vfs.setVerifyChecksum(verifyChecksum);
+  }
+
+  @Override
+  public boolean recoverLease(final Path f) throws IOException {
+    ViewFileSystemOverloadScheme.MountPathInfo<FileSystem> mountPathInfo =
+        this.vfs.getMountPathInfo(f, getConf());
+    checkDFS(mountPathInfo.getTargetFs(), "recoverLease");
+    return ((DistributedFileSystem) mountPathInfo.getTargetFs())
+        .recoverLease(mountPathInfo.getPathOnTarget());
+  }
+
+  @Override
+  public FSDataInputStream open(final Path f, final int bufferSize)
+      throws AccessControlException, FileNotFoundException, IOException {
+    if (this.vfs == null) {
+      return super.open(f, bufferSize);
+    }
+
+    return this.vfs.open(f, bufferSize);
+  }
+
+  @Override
+  public FSDataInputStream open(PathHandle fd, int bufferSize)
+      throws IOException {
+    return this.vfs.open(fd, bufferSize);
+  }
+
+  @Override
+  protected HdfsPathHandle createPathHandle(FileStatus st,
+      Options.HandleOpt... opts) {
+    if (this.vfs == null) {
+      return super.createPathHandle(st, opts);
+    }
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public FSDataOutputStream append(final Path f, final int bufferSize,
+      final Progressable progress) throws IOException {
+    if (this.vfs == null) {
+      return super.append(f, bufferSize, progress);
+    }
+    return this.vfs.append(f, bufferSize, progress);
+  }
+
+  @Override
+  public FSDataOutputStream append(Path f, final EnumSet<CreateFlag> flag,
+      final int bufferSize, final Progressable progress) throws IOException {
+    if (this.vfs == null) {
+      return super.append(f, flag, bufferSize, progress);
+    }
+    ViewFileSystemOverloadScheme.MountPathInfo<FileSystem> mountPathInfo =
+        this.vfs.getMountPathInfo(f, getConf());
+    checkDFS(mountPathInfo.getTargetFs(), "append");
+    return ((DistributedFileSystem) mountPathInfo.getTargetFs())
+        .append(mountPathInfo.getPathOnTarget(), flag, bufferSize, progress);
+  }
+
+  @Override
+  public FSDataOutputStream append(Path f, final EnumSet<CreateFlag> flag,
+      final int bufferSize, final Progressable progress,
+      final InetSocketAddress[] favoredNodes) throws IOException {
+    if (this.vfs == null) {
+      return super.append(f, flag, bufferSize, progress, favoredNodes);
+    }
+    ViewFileSystemOverloadScheme.MountPathInfo<FileSystem> mountPathInfo =
+        this.vfs.getMountPathInfo(f, getConf());
+    checkDFS(mountPathInfo.getTargetFs(), "append");
+    return ((DistributedFileSystem) mountPathInfo.getTargetFs())
+        .append(mountPathInfo.getPathOnTarget(), flag, bufferSize, progress,
+            favoredNodes);
+  }
+
+  @Override
+  public FSDataOutputStream create(Path f, FsPermission permission,
+      boolean overwrite, int bufferSize, short replication, long blockSize,
+      Progressable progress) throws IOException {
+    if (this.vfs == null) {
+      return super
+          .create(f, permission, overwrite, bufferSize, replication, blockSize,
+              progress);
+    }
+    return this.vfs
+        .create(f, permission, overwrite, bufferSize, replication, blockSize,
+            progress);
+  }
+
+  @Override
+  public HdfsDataOutputStream create(final Path f,
+      final FsPermission permission, final boolean overwrite,
+      final int bufferSize, final short replication, final long blockSize,
+      final Progressable progress, final InetSocketAddress[] favoredNodes)
+      throws IOException {
+    if (this.vfs == null) {
+      return super
+          .create(f, permission, overwrite, bufferSize, replication, blockSize,
+              progress, favoredNodes);
+    }
+    ViewFileSystemOverloadScheme.MountPathInfo<FileSystem> mountPathInfo =
+        this.vfs.getMountPathInfo(f, getConf());
+    checkDFS(mountPathInfo.getTargetFs(), "create");
+    return ((DistributedFileSystem) mountPathInfo.getTargetFs())
+        .create(mountPathInfo.getPathOnTarget(), permission, overwrite,
+            bufferSize, replication, blockSize, progress, favoredNodes);
+  }
+
+  @Override
+  //DFS specific API
+  public FSDataOutputStream create(final Path f, final FsPermission permission,
+      final EnumSet<CreateFlag> cflags, final int bufferSize,
+      final short replication, final long blockSize,
+      final Progressable progress, final Options.ChecksumOpt checksumOpt)
+      throws IOException {
+    if (this.vfs == null) {
+      return super
+          .create(f, permission, cflags, bufferSize, replication, blockSize,
+              progress, checksumOpt);
+    }
+    ViewFileSystemOverloadScheme.MountPathInfo<FileSystem> mountPathInfo =
+        this.vfs.getMountPathInfo(f, getConf());
+    checkDFS(mountPathInfo.getTargetFs(), "create");
+    return mountPathInfo.getTargetFs()
+        .create(mountPathInfo.getPathOnTarget(), permission, cflags, bufferSize,
+            replication, blockSize, progress, checksumOpt);
+  }
+
+  void checkDFS(FileSystem fs, String methodName) {
+    if (!(fs instanceof DistributedFileSystem)) {
+      String msg = new StringBuilder("This API:").append(methodName)
+          .append(" is specific to DFS. Can't run on other fs:")
+          .append(fs.getUri()).toString();
+      throw new UnsupportedOperationException(msg);
+    }
+  }
+
+  void checkDefaultDFS(FileSystem fs, String methodName) {
+    if (fs == null) {
+      String msg = new StringBuilder("This API:").append(methodName).append(
+          " cannot be supported without default cluster(that is linkFallBack).")
+          .toString();
+      throw new UnsupportedOperationException(msg);
+    }
+  }
+
+  @Override
+  // DFS specific API
+  protected HdfsDataOutputStream primitiveCreate(Path f,
+      FsPermission absolutePermission, EnumSet<CreateFlag> flag, int bufferSize,
+      short replication, long blockSize, Progressable progress,
+      Options.ChecksumOpt checksumOpt) throws IOException {
+    if (this.vfs == null) {
+      return super
+          .primitiveCreate(f, absolutePermission, flag, bufferSize, replication,
+              blockSize, progress, checksumOpt);
+    }
+    ViewFileSystemOverloadScheme.MountPathInfo<FileSystem> mountPathInfo =
+        this.vfs.getMountPathInfo(f, getConf());
+    checkDFS(mountPathInfo.getTargetFs(), "primitiveCreate");
+    return ((DistributedFileSystem) mountPathInfo.getTargetFs())
+        .primitiveCreate(f, absolutePermission, flag, bufferSize, replication,
+            blockSize, progress, checksumOpt);
+  }
+
+  @Override
+  public FSDataOutputStream createNonRecursive(Path f, FsPermission permission,
+      EnumSet<CreateFlag> flags, int bufferSize, short replication,
+      long blockSize, Progressable progress) throws IOException {
+    if (this.vfs == null) {
+      return super
+          .createNonRecursive(f, permission, flags, bufferSize, replication,
+              bufferSize, progress);
+    }
+    return this.vfs
+        .createNonRecursive(f, permission, flags, bufferSize, replication,
+            bufferSize, progress);
+  }
+
+  @Override
+  public boolean setReplication(final Path f, final short replication)
+      throws AccessControlException, FileNotFoundException, IOException {
+    if (this.vfs == null) {
+      return super.setReplication(f, replication);
+    }
+    return this.vfs.setReplication(f, replication);
+  }
+
+  @Override
+  public void setStoragePolicy(Path src, String policyName) throws IOException {
+    if (this.vfs == null) {
+      super.setStoragePolicy(src, policyName);
+      return;
+    }
+    this.vfs.setStoragePolicy(src, policyName);
+  }
+
+  @Override
+  public void unsetStoragePolicy(Path src) throws IOException {
+    if (this.vfs == null) {
+      super.unsetStoragePolicy(src);
+      return;
+    }
+    this.vfs.unsetStoragePolicy(src);
+  }
+
+  @Override
+  public BlockStoragePolicySpi getStoragePolicy(Path src) throws IOException {
+    if (this.vfs == null) {
+      return super.getStoragePolicy(src);
+    }
+    return this.vfs.getStoragePolicy(src);
+  }
+
+  @Override
+  public Collection<BlockStoragePolicy> getAllStoragePolicies()
+      throws IOException {
+    if (this.vfs == null) {
+      return super.getAllStoragePolicies();
+    }
+    Collection<? extends BlockStoragePolicySpi> allStoragePolicies =
+        this.vfs.getAllStoragePolicies();
+    return (Collection<BlockStoragePolicy>) allStoragePolicies;
+  }
+
+  @Override
+  public long getBytesWithFutureGenerationStamps() throws IOException {
+    if (this.vfs == null) {
+      return super.getBytesWithFutureGenerationStamps();
+    }
+    checkDefaultDFS(defaultDFS, "getBytesWithFutureGenerationStamps");
+    return defaultDFS.getBytesWithFutureGenerationStamps();
+  }
+
+  @Deprecated
+  @Override
+  public BlockStoragePolicy[] getStoragePolicies() throws IOException {
+    if (this.vfs == null) {
+      return super.getStoragePolicies();
+    }
+    checkDefaultDFS(defaultDFS, "getStoragePolicies");
+    return defaultDFS.getStoragePolicies();
+  }
+
+  @Override
+  //Make sure your target fs supports this API, otherwise you will get
+  // Unsupported operation exception.
+  public void concat(Path trg, Path[] psrcs) throws IOException {
+    if (this.vfs == null) {
+      super.concat(trg, psrcs);
+      return;
+    }
+    ViewFileSystemOverloadScheme.MountPathInfo<FileSystem> mountPathInfo =
+        this.vfs.getMountPathInfo(trg, getConf());
+    mountPathInfo.getTargetFs().concat(mountPathInfo.getPathOnTarget(), psrcs);
+  }
+
+  @SuppressWarnings("deprecation")
+  @Override
+  public boolean rename(final Path src, final Path dst) throws IOException {
+    if (this.vfs == null) {
+      return super.rename(src, dst);
+    }
+    return this.vfs.rename(src, dst);
+  }
+
+  @SuppressWarnings("deprecation")
+  @Override
+  public void rename(Path src, Path dst, final Options.Rename... options)
+      throws IOException {
+    if (this.vfs == null) {
+      super.rename(src, dst, options);
+      return;
+    }
+
+    // TODO: revisit
+    ViewFileSystemOverloadScheme.MountPathInfo<FileSystem> mountSrcPathInfo =
+        this.vfs.getMountPathInfo(src, getConf());
+    checkDFS(mountSrcPathInfo.getTargetFs(), "rename");
+
+    ViewFileSystemOverloadScheme.MountPathInfo<FileSystem> mountDstPathInfo =
+        this.vfs.getMountPathInfo(src, getConf());
+    checkDFS(mountDstPathInfo.getTargetFs(), "rename");
+
+    //Check both in same cluster.
+    if (!mountSrcPathInfo.getTargetFs().getUri()
+        .equals(mountDstPathInfo.getTargetFs().getUri())) {
+      throw new HadoopIllegalArgumentException(
+          "Can't rename across file systems.");
+    }
+
+    ((DistributedFileSystem) mountSrcPathInfo.getTargetFs())
+        .rename(mountSrcPathInfo.getPathOnTarget(),
+            mountDstPathInfo.getPathOnTarget(), options);
+  }
+
+  @Override
+  public boolean truncate(final Path f, final long newLength)
+      throws IOException {
+    if (this.vfs == null) {
+      return super.truncate(f, newLength);
+    }
+    return this.vfs.truncate(f, newLength);
+  }
+
+  public boolean delete(final Path f, final boolean recursive)
+      throws AccessControlException, FileNotFoundException, IOException {
+    if (this.vfs == null) {
+      return super.delete(f, recursive);
+    }
+    return this.vfs.delete(f, recursive);
+  }
+
+  @Override
+  public ContentSummary getContentSummary(Path f) throws IOException {
+    if (this.vfs == null) {
+      return super.getContentSummary(f);
+    }
+    return this.vfs.getContentSummary(f);
+  }
+
+  @Override
+  public QuotaUsage getQuotaUsage(Path f) throws IOException {
+    if (this.vfs == null) {
+      return super.getQuotaUsage(f);
+    }
+    return this.vfs.getQuotaUsage(f);
+  }
+
+  @Override
+  public void setQuota(Path src, final long namespaceQuota,
+      final long storagespaceQuota) throws IOException {
+    if (this.vfs == null) {
+      super.setQuota(src, namespaceQuota, storagespaceQuota);
+      return;
+    }
+    ViewFileSystemOverloadScheme.MountPathInfo<FileSystem> mountPathInfo =
+        this.vfs.getMountPathInfo(src, getConf());
+    mountPathInfo.getTargetFs()
+        .setQuota(mountPathInfo.getPathOnTarget(), namespaceQuota,
+            storagespaceQuota);
+  }
+
+  @Override
+  public void setQuotaByStorageType(Path src, final StorageType type,
+      final long quota) throws IOException {
+    if (this.vfs == null) {
+      super.setQuotaByStorageType(src, type, quota);
+      return;
+    }
+    ViewFileSystemOverloadScheme.MountPathInfo<FileSystem> mountPathInfo =
+        this.vfs.getMountPathInfo(src, getConf());
+    mountPathInfo.getTargetFs()
+        .setQuotaByStorageType(mountPathInfo.getPathOnTarget(), type, quota);
+  }
+
+  @Override
+  public FileStatus[] listStatus(Path p) throws IOException {
+    if (this.vfs == null) {
+      return super.listStatus(p);
+    }
+    return this.vfs.listStatus(p);
+  }
+
+  @Override
+  public RemoteIterator<LocatedFileStatus> listLocatedStatus(final Path f,
+      final PathFilter filter) throws FileNotFoundException, IOException {
+    if (this.vfs == null) {
+      return super.listLocatedStatus(f, filter);
+    }
+    return this.vfs.listLocatedStatus(f, filter);
+  }
+
+  @Override
+  public RemoteIterator<FileStatus> listStatusIterator(final Path p)
+      throws IOException {
+    if (this.vfs == null) {
+      return super.listStatusIterator(p);
+    }
+    ViewFileSystemOverloadScheme.MountPathInfo<FileSystem> mountPathInfo =
+        this.vfs.getMountPathInfo(p, getConf());
+    return mountPathInfo.getTargetFs()
+        .listStatusIterator(mountPathInfo.getPathOnTarget());
+  }
+
+  @Override
+  public RemoteIterator<PartialListing<FileStatus>> batchedListStatusIterator(
+      final List<Path> paths) throws IOException {
+    if (this.vfs == null) {
+      return super.batchedListStatusIterator(paths);
+    }
+    // TODO: revisit for correct implementation.
+    return this.defaultDFS.batchedListStatusIterator(paths);
+  }
+
+  @Override
+  public RemoteIterator<PartialListing<LocatedFileStatus>> batchedListLocatedStatusIterator(
+      final List<Path> paths) throws IOException {
+    if (this.vfs == null) {
+      return super.batchedListLocatedStatusIterator(paths);
+    }
+    // TODO: revisit for correct implementation.
+    return this.defaultDFS.batchedListLocatedStatusIterator(paths);
+  }
+
+  public boolean mkdir(Path f, FsPermission permission) throws IOException {
+    if (this.vfs == null) {
+      return super.mkdir(f, permission);
+    }
+    ViewFileSystemOverloadScheme.MountPathInfo<FileSystem> mountPathInfo =
+        this.vfs.getMountPathInfo(f, getConf());
+    checkDFS(mountPathInfo.getTargetFs(), "mkdir");
+    return ((DistributedFileSystem) mountPathInfo.getTargetFs())
+        .mkdir(mountPathInfo.getPathOnTarget(), permission);
+  }
+
+  @Override
+  public boolean mkdirs(Path f, FsPermission permission) throws IOException {
+    if (this.vfs == null) {
+      return super.mkdirs(f, permission);
+    }
+    return this.vfs.mkdirs(f, permission);
+  }
+
+  @SuppressWarnings("deprecation")
+  @Override
+  protected boolean primitiveMkdir(Path f, FsPermission absolutePermission)
+      throws IOException {
+    if (this.vfs == null) {
+      return super.primitiveMkdir(f, absolutePermission);
+    }
+    ViewFileSystemOverloadScheme.MountPathInfo<FileSystem> mountPathInfo =
+        this.vfs.getMountPathInfo(f, getConf());
+    checkDFS(mountPathInfo.getTargetFs(), "primitiveMkdir");
+    return ((DistributedFileSystem) mountPathInfo.getTargetFs())
+        .primitiveMkdir(mountPathInfo.getPathOnTarget(), absolutePermission);
+  }
+
+  @Override
+  public void close() throws IOException {
+    if (this.vfs != null) {
+      this.vfs.close();
+    }
+    super.close();
+  }
+
+  @InterfaceAudience.Private
+  @Override
+  public DFSClient getClient() {
+    if (this.vfs == null) {
+      return super.getClient();
+    }
+    checkDefaultDFS(defaultDFS, "getClient");
+    return defaultDFS.getClient();
+  }
+
+  @Override
+  public FsStatus getStatus(Path p) throws IOException {
+    if (this.vfs == null) {
+      return super.getStatus(p);
+    }
+    return this.vfs.getStatus(p);
+  }
+
+  @Override
+  public long getMissingBlocksCount() throws IOException {
+    if (this.vfs == null) {
+      return super.getMissingBlocksCount();
+    }
+    checkDefaultDFS(defaultDFS, "getMissingBlocksCount");
+    return defaultDFS.getMissingBlocksCount();
+  }
+
+  @Override
+  public long getPendingDeletionBlocksCount() throws IOException {
+    if (this.vfs == null) {
+      return super.getPendingDeletionBlocksCount();
+    }
+    checkDefaultDFS(defaultDFS, "getPendingDeletionBlocksCount");
+    return defaultDFS.getPendingDeletionBlocksCount();
+  }
+
+  @Override
+  public long getMissingReplOneBlocksCount() throws IOException {
+    if (this.vfs == null) {
+      return super.getMissingReplOneBlocksCount();
+    }
+    checkDefaultDFS(defaultDFS, "getMissingReplOneBlocksCount");
+    return defaultDFS.getMissingReplOneBlocksCount();
+  }
+
+  @Override
+  public long getLowRedundancyBlocksCount() throws IOException {
+    if (this.vfs == null) {
+      return super.getLowRedundancyBlocksCount();
+    }
+    checkDefaultDFS(defaultDFS, "getLowRedundancyBlocksCount");
+    return defaultDFS.getLowRedundancyBlocksCount();
+  }
+
+  @Override
+  public long getCorruptBlocksCount() throws IOException {
+    if (this.vfs == null) {
+      return super.getCorruptBlocksCount();
+    }
+    checkDefaultDFS(defaultDFS, "getCorruptBlocksCount");
+    return defaultDFS.getLowRedundancyBlocksCount();
+  }
+
+  @Override
+  public RemoteIterator<Path> listCorruptFileBlocks(final Path path)
+      throws IOException {
+    if (this.vfs == null) {
+      return super.listCorruptFileBlocks(path);
+    }
+    ViewFileSystemOverloadScheme.MountPathInfo<FileSystem> mountPathInfo =
+        this.vfs.getMountPathInfo(path, getConf());
+    return mountPathInfo.getTargetFs()
+        .listCorruptFileBlocks(mountPathInfo.getPathOnTarget());
+  }
+
+  @Override
+  public DatanodeInfo[] getDataNodeStats() throws IOException {
+    if (this.vfs == null) {
+      return super.getDataNodeStats();
+    }
+    checkDefaultDFS(defaultDFS, "getDataNodeStats");
+    return defaultDFS.getDataNodeStats();
+  }
+
+  @Override
+  public DatanodeInfo[] getDataNodeStats(
+      final HdfsConstants.DatanodeReportType type) throws IOException {
+    if (this.vfs == null) {
+      return super.getDataNodeStats(type);
+    }
+    checkDefaultDFS(defaultDFS, "getDataNodeStats");
+    return defaultDFS.getDataNodeStats(type);
+  }
+
+  @Override
+  public boolean setSafeMode(HdfsConstants.SafeModeAction action)
+      throws IOException {
+    if (this.vfs == null) {
+      return super.setSafeMode(action);
+    }
+    checkDefaultDFS(defaultDFS, "setSafeMode");
+    return defaultDFS.setSafeMode(action);
+  }
+
+  @Override
+  public boolean setSafeMode(HdfsConstants.SafeModeAction action,
+      boolean isChecked) throws IOException {
+    if (this.vfs == null) {
+      return super.setSafeMode(action, isChecked);
+    }
+    checkDefaultDFS(defaultDFS, "setSafeMode");
+    return defaultDFS.setSafeMode(action, isChecked);
+  }
+
+  @Override
+  public boolean saveNamespace(long timeWindow, long txGap) throws IOException {
+    if (this.vfs == null) {
+      return super.saveNamespace(timeWindow, txGap);
+    }
+    checkDefaultDFS(defaultDFS, "saveNamespace");
+    return defaultDFS.saveNamespace(timeWindow, txGap);
+  }
+
+  @Override
+  public void saveNamespace() throws IOException {
+    if (this.vfs == null) {
+      super.saveNamespace();
+      return;
+    }
+    checkDefaultDFS(defaultDFS, "saveNamespace");
+    defaultDFS.saveNamespace();
+  }
+
+  @Override
+  public long rollEdits() throws IOException {
+    if (this.vfs == null) {
+      return super.rollEdits();
+    }
+    checkDefaultDFS(defaultDFS, "rollEdits");
+    return defaultDFS.rollEdits();
+  }
+
+  @Override
+  public boolean restoreFailedStorage(String arg) throws IOException {
+    if (this.vfs == null) {
+      return super.restoreFailedStorage(arg);
+    }
+    checkDefaultDFS(defaultDFS, "restoreFailedStorage");
+    return defaultDFS.restoreFailedStorage(arg);
+  }
+
+  @Override
+  public void refreshNodes() throws IOException {
+    if (this.vfs == null) {
+      super.refreshNodes();
+      return;
+    }
+    checkDefaultDFS(defaultDFS, "refreshNodes");
+    defaultDFS.refreshNodes();
+  }
+
+  @Override
+  public void finalizeUpgrade() throws IOException {
+    if (this.vfs == null) {
+      super.finalizeUpgrade();
+      return;
+    }
+    checkDefaultDFS(defaultDFS, "finalizeUpgrade");
+    defaultDFS.finalizeUpgrade();
+  }
+
+  @Override
+  public boolean upgradeStatus() throws IOException {
+    if (this.vfs == null) {
+      return super.upgradeStatus();
+    }
+    checkDefaultDFS(defaultDFS, "upgradeStatus");
+    return defaultDFS.upgradeStatus();
+  }
+
+  @Override
+  public RollingUpgradeInfo rollingUpgrade(
+      HdfsConstants.RollingUpgradeAction action) throws IOException {
+    if (this.vfs == null) {
+      return super.rollingUpgrade(action);
+    }
+    checkDefaultDFS(defaultDFS, "rollingUpgrade");
+    return defaultDFS.rollingUpgrade(action);
+  }
+
+  @Override
+  public void metaSave(String pathname) throws IOException {
+    if (this.vfs == null) {
+      super.metaSave(pathname);
+      return;
+    }
+    checkDefaultDFS(defaultDFS, "metaSave");
+    defaultDFS.metaSave(pathname);
+  }
+
+  @Override
+  public FsServerDefaults getServerDefaults() throws IOException {
+    if (this.vfs == null) {
+      return super.getServerDefaults();
+    }
+    checkDefaultDFS(defaultDFS, "getServerDefaults");
+    //TODO: Need to revisit.
+    return defaultDFS.getServerDefaults();
+  }
+
+  @Override
+  public FileStatus getFileStatus(final Path f)
+      throws AccessControlException, FileNotFoundException, IOException {
+    if (this.vfs == null) {
+      return super.getFileStatus(f);
+    }
+    return this.vfs.getFileStatus(f);
+  }
+
+  @SuppressWarnings("deprecation")
+  @Override
+  public void createSymlink(final Path target, final Path link,
+      final boolean createParent) throws IOException {
+     // Regular DFS behavior
+    if (this.vfs == null) {
+      super.createSymlink(target, link, createParent);
+      return;
+    }
+
+    throw new UnsupportedOperationException(
+        "createSymlink is not supported in ViewHDFS");
+  }
+
+  @Override
+  public boolean supportsSymlinks() {
+    if (this.vfs == null) {
+      return super.supportsSymlinks();
+    }
+    // we can enabled later if we want to support symlinks.
+    return false;
+  }
+
+  @Override
+  public FileStatus getFileLinkStatus(final Path f) throws IOException {
+    if (this.vfs == null) {
+      return super.getFileLinkStatus(f);
+    }
+    ViewFileSystemOverloadScheme.MountPathInfo<FileSystem> mountPathInfo =
+        this.vfs.getMountPathInfo(f, getConf());
+    return mountPathInfo.getTargetFs()
+        .getFileLinkStatus(mountPathInfo.getPathOnTarget());
+  }
+
+  @Override
+  public Path getLinkTarget(Path path) throws IOException {
+    if(this.vfs==null){
+      return super.getLinkTarget(path);
+    }
+    return this.vfs.getLinkTarget(path);
+  }
+
+  @Override
+  protected Path resolveLink(Path f) throws IOException {
+    if(this.vfs==null){
+      return super.resolveLink(f);
+    }
+    ViewFileSystemOverloadScheme.MountPathInfo<FileSystem> mountPathInfo =
+        this.vfs.getMountPathInfo(f, getConf());
+    checkDFS(mountPathInfo.getTargetFs(), "resolveLink");
+    return ((DistributedFileSystem) mountPathInfo.getTargetFs())
+        .resolveLink(mountPathInfo.getPathOnTarget());
+  }
+
+  @Override
+  public FileChecksum getFileChecksum(final Path f)
+      throws AccessControlException, FileNotFoundException, IOException {
+    if (this.vfs == null) {
+      return super.getFileChecksum(f);
+    }
+    return this.vfs.getFileChecksum(f);
+  }
+
+  @Override
+  public void setPermission(final Path f, final FsPermission permission)
+      throws AccessControlException, FileNotFoundException, IOException {
+    if (this.vfs == null) {
+      super.setPermission(f, permission);
+      return;
+    }
+    this.vfs.setPermission(f, permission);
+  }
+
+  @Override
+  public void setOwner(final Path f, final String username,
+      final String groupname)
+      throws AccessControlException, FileNotFoundException, IOException {
+    if (this.vfs == null) {
+      super.setOwner(f, username, groupname);
+      return;
+    }
+    this.vfs.setOwner(f, username, groupname);
+  }
+
+  @Override
+  public void setTimes(final Path f, final long mtime, final long atime)
+      throws AccessControlException, FileNotFoundException, IOException {
+    if (this.vfs == null) {
+      super.setTimes(f, mtime, atime);
+      return;
+    }
+    this.vfs.setTimes(f, mtime, atime);
+  }
+
+  @Override
+  // DFS specific API
+  protected int getDefaultPort() {
+    return super.getDefaultPort();
+  }
+
+  @Override
+  public Token<DelegationTokenIdentifier> getDelegationToken(String renewer)
+      throws IOException {
+    if (this.vfs == null) {
+      return super.getDelegationToken(renewer);
+    }
+    //Let applications call getDelegationTokenIssuers and get respective
+    // delegation tokens from child fs.
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public void setBalancerBandwidth(long bandwidth) throws IOException {
+    if (this.vfs == null) {
+      super.setBalancerBandwidth(bandwidth);
+      return;
+    }
+    checkDefaultDFS(defaultDFS, "setBalancerBandwidth");
+    defaultDFS.setBalancerBandwidth(bandwidth);
+  }
+
+  @Override
+  public String getCanonicalServiceName() {
+    if (this.vfs == null) {
+      return super.getCanonicalServiceName();
+    }
+    checkDefaultDFS(defaultDFS, "getCanonicalServiceName");
+    return defaultDFS.getCanonicalServiceName();
+  }
+
+  @Override
+  protected URI canonicalizeUri(URI uri) {
+    if (this.vfs == null) {
+      return super.canonicalizeUri(uri);
+    }
+
+    ViewFileSystemOverloadScheme.MountPathInfo<FileSystem> mountPathInfo = null;
+    try {
+      mountPathInfo = this.vfs.getMountPathInfo(new Path(uri), getConf());
+    } catch (IOException e) {
+      LOGGER.warn("Failed to resolve the uri as mount path", e);
+      return null;
+    }
+    checkDFS(mountPathInfo.getTargetFs(), "canonicalizeUri");
+    return ((DistributedFileSystem) mountPathInfo.getTargetFs())
+        .canonicalizeUri(uri);
+  }
+
+  @Override
+  public boolean isInSafeMode() throws IOException {
+    if (this.vfs == null) {
+      return super.isInSafeMode();
+    }
+    checkDefaultDFS(defaultDFS, "isInSafeMode");
+    return defaultDFS.isInSafeMode();
+  }
+
+  @Override
+  // DFS specific API
+  public void allowSnapshot(Path path) throws IOException {
+    if (this.vfs == null) {
+      super.allowSnapshot(path);
+      return;
+    }
+    ViewFileSystemOverloadScheme.MountPathInfo<FileSystem> mountPathInfo =
+        this.vfs.getMountPathInfo(path, getConf());
+    checkDFS(mountPathInfo.getTargetFs(), "allowSnapshot");
+    ((DistributedFileSystem) mountPathInfo.getTargetFs())
+        .allowSnapshot(mountPathInfo.getPathOnTarget());
+  }
+
+  @Override
+  public void disallowSnapshot(final Path path) throws IOException {
+    if (this.vfs == null) {
+      super.disallowSnapshot(path);
+      return;
+    }
+    ViewFileSystemOverloadScheme.MountPathInfo<FileSystem> mountPathInfo =
+        this.vfs.getMountPathInfo(path, getConf());
+    checkDFS(mountPathInfo.getTargetFs(), "disallowSnapshot");
+    ((DistributedFileSystem) mountPathInfo.getTargetFs())
+        .disallowSnapshot(mountPathInfo.getPathOnTarget());
+  }
+
+  @Override
+  public Path createSnapshot(Path path, String snapshotName)
+      throws IOException {
+    if (this.vfs == null) {
+      return super.createSnapshot(path, snapshotName);
+    }
+    return this.vfs.createSnapshot(path, snapshotName);
+  }
+
+  @Override
+  public void renameSnapshot(Path path, String snapshotOldName,
+      String snapshotNewName) throws IOException {
+    if (this.vfs == null) {
+      super.renameSnapshot(path, snapshotOldName, snapshotNewName);
+      return;
+    }
+    this.vfs.renameSnapshot(path, snapshotOldName, snapshotNewName);
+  }
+
+  @Override
+  //Ony for HDFS users
+  public SnapshottableDirectoryStatus[] getSnapshottableDirListing()
+      throws IOException {
+    if (this.vfs == null) {
+      return super.getSnapshottableDirListing();
+    }
+    checkDefaultDFS(defaultDFS, "getSnapshottableDirListing");
+    return defaultDFS.getSnapshottableDirListing();
+  }
+
+  @Override
+  public void deleteSnapshot(Path path, String snapshotName)
+      throws IOException {
+    if (this.vfs == null) {
+      super.deleteSnapshot(path, snapshotName);
+      return;
+    }
+    this.vfs.deleteSnapshot(path, snapshotName);
+  }
+
+  @Override
+  public RemoteIterator<SnapshotDiffReportListing> snapshotDiffReportListingRemoteIterator(
+      final Path snapshotDir, final String fromSnapshot,
+      final String toSnapshot) throws IOException {
+    if (this.vfs == null) {
+      return super
+          .snapshotDiffReportListingRemoteIterator(snapshotDir, fromSnapshot,
+              toSnapshot);
+    }
+    ViewFileSystemOverloadScheme.MountPathInfo<FileSystem> mountPathInfo =
+        this.vfs.getMountPathInfo(snapshotDir, getConf());
+    checkDFS(mountPathInfo.getTargetFs(),
+        "snapshotDiffReportListingRemoteIterator");
+    return ((DistributedFileSystem) mountPathInfo.getTargetFs())
+        .snapshotDiffReportListingRemoteIterator(
+            mountPathInfo.getPathOnTarget(), fromSnapshot, toSnapshot);
+  }
+
+  @Override
+  public SnapshotDiffReport getSnapshotDiffReport(final Path snapshotDir,
+      final String fromSnapshot, final String toSnapshot) throws IOException {
+    if (this.vfs == null) {
+      return super.getSnapshotDiffReport(snapshotDir, fromSnapshot, toSnapshot);
+    }
+    ViewFileSystemOverloadScheme.MountPathInfo<FileSystem> mountPathInfo =
+        this.vfs.getMountPathInfo(snapshotDir, getConf());
+    checkDFS(mountPathInfo.getTargetFs(), "getSnapshotDiffReport");
+    return ((DistributedFileSystem) mountPathInfo.getTargetFs())
+        .getSnapshotDiffReport(mountPathInfo.getPathOnTarget(), fromSnapshot,
+            toSnapshot);
+  }
+
+  @Override
+  public boolean isFileClosed(final Path src) throws IOException {
+    if (this.vfs == null) {
+      return super.isFileClosed(src);
+    }
+    ViewFileSystemOverloadScheme.MountPathInfo<FileSystem> mountPathInfo =
+        this.vfs.getMountPathInfo(src, getConf());
+    checkDFS(mountPathInfo.getTargetFs(), "isFileClosed");
+    return ((DistributedFileSystem) mountPathInfo.getTargetFs())
+        .isFileClosed(mountPathInfo.getPathOnTarget());
+  }
+
+  @Override
+  public long addCacheDirective(CacheDirectiveInfo info) throws IOException {
+    if (this.vfs == null) {
+      return super.addCacheDirective(info);
+    }
+    ViewFileSystemOverloadScheme.MountPathInfo<FileSystem> mountPathInfo =
+        this.vfs.getMountPathInfo(info.getPath(), getConf());
+    checkDFS(mountPathInfo.getTargetFs(), "addCacheDirective");
+
+    return ((DistributedFileSystem) mountPathInfo.getTargetFs())
+        .addCacheDirective(new CacheDirectiveInfo.Builder(info)
+            .setPath(mountPathInfo.getPathOnTarget()).build());
+  }
+
+  @Override
+  public long addCacheDirective(CacheDirectiveInfo info,
+      EnumSet<CacheFlag> flags) throws IOException {
+    if (this.vfs == null) {
+      return super.addCacheDirective(info, flags);
+    }
+    ViewFileSystemOverloadScheme.MountPathInfo<FileSystem> mountPathInfo =
+        this.vfs.getMountPathInfo(info.getPath(), getConf());
+    checkDFS(mountPathInfo.getTargetFs(), "addCacheDirective");
+
+    return ((DistributedFileSystem) mountPathInfo.getTargetFs())
+        .addCacheDirective(new CacheDirectiveInfo.Builder(info)
+            .setPath(mountPathInfo.getPathOnTarget()).build(), flags);
+  }
+
+  @Override
+  public void modifyCacheDirective(CacheDirectiveInfo info) throws IOException {
+    if (this.vfs == null) {
+      super.modifyCacheDirective(info);
+      return;
+    }
+    if (info.getPath() != null) {
+      ViewFileSystemOverloadScheme.MountPathInfo<FileSystem> mountPathInfo =
+          this.vfs.getMountPathInfo(info.getPath(), getConf());
+      checkDFS(mountPathInfo.getTargetFs(), "modifyCacheDirective");
+      ((DistributedFileSystem) mountPathInfo.getTargetFs())
+          .modifyCacheDirective(new CacheDirectiveInfo.Builder(info)
+              .setPath(mountPathInfo.getPathOnTarget()).build());
+      return;
+    }
+
+    // No path available in CacheDirectiveInfo, Let's shoot to all child fs.
+    List<IOException> failedExceptions = new ArrayList<>();
+    boolean isDFSExistsInChilds = false;
+
+    for (FileSystem fs : getChildFileSystems()) {
+      if (!(fs instanceof DistributedFileSystem)) {
+        continue;
+      }
+      isDFSExistsInChilds = true;
+      DistributedFileSystem dfs = (DistributedFileSystem) fs;
+      try {
+        dfs.modifyCacheDirective(info);
+      } catch (IOException ioe) {
+        failedExceptions.add(ioe);
+      }
+    }
+    if (!isDFSExistsInChilds) {
+      throw new UnsupportedOperationException(
+          "No DFS available in child file systems.");
+    }
+    if (failedExceptions.size() > 0) {
+      throw MultipleIOException.createIOException(failedExceptions);
+    }
+  }
+
+  @Override
+  public void modifyCacheDirective(CacheDirectiveInfo info,
+      EnumSet<CacheFlag> flags) throws IOException {
+    if (this.vfs == null) {
+      super.modifyCacheDirective(info, flags);
+      return;
+    }
+    if (info.getPath() != null) {
+      ViewFileSystemOverloadScheme.MountPathInfo<FileSystem> mountPathInfo =
+          this.vfs.getMountPathInfo(info.getPath(), getConf());
+      checkDFS(mountPathInfo.getTargetFs(), "modifyCacheDirective");
+      ((DistributedFileSystem) mountPathInfo.getTargetFs())
+          .modifyCacheDirective(new CacheDirectiveInfo.Builder(info)
+              .setPath(mountPathInfo.getPathOnTarget()).build(), flags);
+      return;
+    }
+    // No path available in CacheDirectiveInfo, Let's shoot to all child fs.
+    List<IOException> failedExceptions = new ArrayList<>();
+    boolean isDFSExistsInChilds = false;
+    for (FileSystem fs : getChildFileSystems()) {
+      if (!(fs instanceof DistributedFileSystem)) {
+        continue;
+      }
+      isDFSExistsInChilds = true;
+      DistributedFileSystem dfs = (DistributedFileSystem) fs;
+      try {
+        dfs.modifyCacheDirective(info, flags);
+      } catch (IOException ioe) {
+        failedExceptions.add(ioe);
+      }
+    }
+    if (!isDFSExistsInChilds) {
+      throw new UnsupportedOperationException(
+          "No DFS available in child file systems.");
+    }
+    if (failedExceptions.size() > 0) {
+      throw MultipleIOException.createIOException(failedExceptions);
+    }
+  }
+
+  @Override
+  public void removeCacheDirective(long id) throws IOException {
+    if (this.vfs == null) {
+      super.removeCacheDirective(id);
+      return;
+    }
+    List<IOException> failedExceptions = new ArrayList<>();
+    boolean isDFSExistsInChilds = false;
+
+    for (FileSystem fs : getChildFileSystems()) {
+      if (!(fs instanceof DistributedFileSystem)) {
+        continue;
+      }
+      isDFSExistsInChilds = true;
+      DistributedFileSystem dfs = (DistributedFileSystem) fs;
+      try {
+        dfs.removeCacheDirective(id);
+      } catch (IOException ioe) {
+        failedExceptions.add(ioe);
+      }
+    }
+    if (!isDFSExistsInChilds) {
+      throw new UnsupportedOperationException(
+          "No DFS available in child file systems.");
+    }
+    if (failedExceptions.size() > 0) {
+      throw MultipleIOException.createIOException(failedExceptions);
+    }
+  }
+
+  @Override
+  public RemoteIterator<CacheDirectiveEntry> listCacheDirectives(
+      CacheDirectiveInfo filter) throws IOException {
+    if (this.vfs == null) {
+      return super.listCacheDirectives(filter);
+    }
+
+    if (filter != null && filter.getPath() != null) {
+      ViewFileSystemOverloadScheme.MountPathInfo<FileSystem> mountPathInfo =
+          this.vfs.getMountPathInfo(filter.getPath(), getConf());
+      checkDFS(mountPathInfo.getTargetFs(), "listCacheDirectives");
+      return ((DistributedFileSystem) mountPathInfo.getTargetFs())
+          .listCacheDirectives(new CacheDirectiveInfo.Builder(filter)
+              .setPath(mountPathInfo.getPathOnTarget()).build());
+    }
+
+    // No path available in filter. Let's try to shoot to all child fs.
+    final List<RemoteIterator<CacheDirectiveEntry>> iters = new ArrayList<>();
+    for (FileSystem fs : getChildFileSystems()) {
+      if (fs instanceof DistributedFileSystem) {
+        iters.add(((DistributedFileSystem) fs).listCacheDirectives(filter));
+      }
+    }
+    if (iters.size() == 0) {
+      throw new UnsupportedOperationException(
+          "No DFS found in child fs. This API can't be supported in non DFS");
+    }
+
+    return new RemoteIterator<CacheDirectiveEntry>() {
+      int currIdx = 0;
+      RemoteIterator<CacheDirectiveEntry> currIter = iters.get(currIdx++);
+
+      @Override
+      public boolean hasNext() throws IOException {
+        if (currIter.hasNext()) {
+          return true;
+        }
+        while (currIdx < iters.size()) {
+          currIter = iters.get(currIdx++);
+          if (currIter.hasNext()) {
+            return true;
+          }
+        }
+        return false;
+      }
+
+      @Override
+      public CacheDirectiveEntry next() throws IOException {
+        if (hasNext()) {
+          return currIter.next();
+        }
+        throw new NoSuchElementException("No more elements");
+      }
+    };
+  }
+
+  //Currently Cache pool APIs supported only in default cluster.
+  @Override
+  public void addCachePool(CachePoolInfo info) throws IOException {
+    if (this.vfs == null) {
+      super.addCachePool(info);
+      return;
+    }
+    List<IOException> failedExceptions = new ArrayList<>();
+    boolean isDFSExistsInChilds = false;
+
+    for (FileSystem fs : getChildFileSystems()) {
+      if (!(fs instanceof DistributedFileSystem)) {
+        continue;
+      }
+      isDFSExistsInChilds = true;
+      DistributedFileSystem dfs = (DistributedFileSystem) fs;
+      try {
+        dfs.addCachePool(info);
+      } catch (IOException ioe) {
+        failedExceptions.add(ioe);
+      }
+    }
+    if (!isDFSExistsInChilds) {
+      throw new UnsupportedOperationException(
+          "No DFS available in child file systems.");
+    }
+    if (failedExceptions.size() > 0) {
+      throw MultipleIOException.createIOException(failedExceptions);
+    }
+  }
+
+  @Override
+  public void modifyCachePool(CachePoolInfo info) throws IOException {
+    if (this.vfs == null) {
+      super.modifyCachePool(info);
+      return;
+    }
+    List<IOException> failedExceptions = new ArrayList<>();
+    boolean isDFSExistsInChilds = false;
+
+    for (FileSystem fs : getChildFileSystems()) {
+      if (!(fs instanceof DistributedFileSystem)) {
+        continue;
+      }
+      isDFSExistsInChilds = true;
+      DistributedFileSystem dfs = (DistributedFileSystem) fs;
+      try {
+        dfs.modifyCachePool(info);
+      } catch (IOException ioe) {
+        failedExceptions.add(ioe);
+      }
+    }
+    if (!isDFSExistsInChilds) {
+      throw new UnsupportedOperationException(
+          "No DFS available in child file systems.");
+    }
+    if (failedExceptions.size() > 0) {
+      throw MultipleIOException.createIOException(failedExceptions);
+    }
+  }
+
+  @Override
+  public void removeCachePool(String poolName) throws IOException {
+    if (this.vfs == null) {
+      super.removeCachePool(poolName);
+      return;
+    }
+    List<IOException> failedExceptions = new ArrayList<>();
+    boolean isDFSExistsInChilds = false;
+
+    for (FileSystem fs : getChildFileSystems()) {
+      if (!(fs instanceof DistributedFileSystem)) {
+        continue;
+      }
+      isDFSExistsInChilds = true;
+      DistributedFileSystem dfs = (DistributedFileSystem) fs;
+      try {
+        dfs.removeCachePool(poolName);
+      } catch (IOException ioe) {
+        failedExceptions.add(ioe);
+      }
+    }
+    if (!isDFSExistsInChilds) {
+      throw new UnsupportedOperationException(
+          "No DFS available in child file systems.");
+    }
+    if (failedExceptions.size() > 0) {
+      throw MultipleIOException.createIOException(failedExceptions);
+    }
+  }
+
+  @Override
+  public RemoteIterator<CachePoolEntry> listCachePools() throws IOException {
+    if (this.vfs == null) {
+      return super.listCachePools();
+    }
+
+    List<DistributedFileSystem> childDFSs = new ArrayList<>();
+    for (FileSystem fs : getChildFileSystems()) {
+      if (fs instanceof DistributedFileSystem) {
+        childDFSs.add((DistributedFileSystem) fs);
+      }
+    }
+    if (childDFSs.size() == 0) {
+      throw new UnsupportedOperationException(
+          "No DFS found in child fs. This API can't be supported in non DFS");
+    }
+    return new RemoteIterator<CachePoolEntry>() {
+      int curDfsIdx = 0;
+      RemoteIterator<CachePoolEntry> currIter =
+          childDFSs.get(curDfsIdx++).listCachePools();
+
+      @Override
+      public boolean hasNext() throws IOException {
+        if (currIter.hasNext()) {
+          return true;
+        }
+        while (curDfsIdx < childDFSs.size()) {
+          currIter = childDFSs.get(curDfsIdx++).listCachePools();
+          if (currIter.hasNext()) {
+            return true;
+          }
+        }
+        return false;
+      }
+
+      @Override
+      public CachePoolEntry next() throws IOException {
+        if (hasNext()) {
+          return currIter.next();
+        }
+        throw new java.util.NoSuchElementException("No more entries");
+      }
+    };
+  }
+
+  @Override
+  public void modifyAclEntries(Path path, List<AclEntry> aclSpec)
+      throws IOException {
+    if (this.vfs == null) {
+      super.modifyAclEntries(path, aclSpec);
+      return;
+    }
+    this.vfs.modifyAclEntries(path, aclSpec);
+  }
+
+  @Override
+  public void removeAclEntries(Path path, List<AclEntry> aclSpec)
+      throws IOException {
+    if (this.vfs == null) {
+      super.removeAclEntries(path, aclSpec);
+      return;
+    }
+    this.vfs.removeAclEntries(path, aclSpec);
+  }
+
+  @Override
+  public void removeDefaultAcl(Path path) throws IOException {
+    if (this.vfs == null) {
+      super.removeDefaultAcl(path);
+      return;
+    }
+    this.vfs.removeDefaultAcl(path);
+  }
+
+  @Override
+  public void removeAcl(Path path) throws IOException {
+    if (this.vfs == null) {
+      super.removeAcl(path);
+      return;
+    }
+    this.vfs.removeAcl(path);
+  }
+
+  @Override
+  public void setAcl(Path path, List<AclEntry> aclSpec) throws IOException {
+    if (this.vfs == null) {
+      super.setAcl(path, aclSpec);
+      return;
+    }
+    this.vfs.setAcl(path, aclSpec);
+  }
+
+  @Override
+  public AclStatus getAclStatus(Path path) throws IOException {
+    if (this.vfs == null) {
+      return super.getAclStatus(path);
+    }
+    return this.vfs.getAclStatus(path);
+  }
+
+  @Override
+  public void createEncryptionZone(final Path path, final String keyName)
+      throws IOException {
+    if (this.vfs == null) {
+      super.createEncryptionZone(path, keyName);
+      return;
+    }
+    ViewFileSystemOverloadScheme.MountPathInfo<FileSystem> mountPathInfo =
+        this.vfs.getMountPathInfo(path, getConf());
+    checkDFS(mountPathInfo.getTargetFs(), "createEncryptionZone");
+    ((DistributedFileSystem) mountPathInfo.getTargetFs())
+        .createEncryptionZone(mountPathInfo.getPathOnTarget(), keyName);
+  }
+
+  @Override
+  public EncryptionZone getEZForPath(final Path path) throws IOException {
+    if (this.vfs == null) {
+      return super.getEZForPath(path);
+    }
+    ViewFileSystemOverloadScheme.MountPathInfo<FileSystem> mountPathInfo =
+        this.vfs.getMountPathInfo(path, getConf());
+    checkDFS(mountPathInfo.getTargetFs(), "getEZForPath");
+    return ((DistributedFileSystem) mountPathInfo.getTargetFs())
+        .getEZForPath(mountPathInfo.getPathOnTarget());
+  }
+
+  /**
+   * Returns the results from default DFS (fallback). If you want the results
+   * from specific clusters, please invoke them on child fs instance directly.
+   */
+  @Override
+  public RemoteIterator<EncryptionZone> listEncryptionZones()
+      throws IOException {
+    if (this.vfs == null) {
+      return super.listEncryptionZones();
+    }
+    checkDefaultDFS(defaultDFS, "listEncryptionZones");
+    return defaultDFS.listEncryptionZones();
+  }
+
+  @Override
+  public void reencryptEncryptionZone(final Path zone,
+      final HdfsConstants.ReencryptAction action) throws IOException {
+    if (this.vfs == null) {
+      super.reencryptEncryptionZone(zone, action);
+      return;
+    }
+    ViewFileSystemOverloadScheme.MountPathInfo<FileSystem> mountPathInfo =
+        this.vfs.getMountPathInfo(zone, getConf());
+    checkDFS(mountPathInfo.getTargetFs(), "reencryptEncryptionZone");
+    ((DistributedFileSystem) mountPathInfo.getTargetFs())
+        .reencryptEncryptionZone(mountPathInfo.getPathOnTarget(), action);
+  }
+
+  /**
+   * Returns the results from default DFS (fallback). If you want the results
+   * from specific clusters, please invoke them on child fs instance directly.
+   */
+  @Override
+  public RemoteIterator<ZoneReencryptionStatus> listReencryptionStatus()
+      throws IOException {
+    if (this.vfs == null) {
+      return super.listReencryptionStatus();
+    }
+    checkDefaultDFS(defaultDFS, "listReencryptionStatus");
+    return defaultDFS.listReencryptionStatus();
+  }
+
+  @Override
+  public FileEncryptionInfo getFileEncryptionInfo(final Path path)
+      throws IOException {
+    if (this.vfs == null) {
+      return super.getFileEncryptionInfo(path);
+    }
+    ViewFileSystemOverloadScheme.MountPathInfo<FileSystem> mountPathInfo =
+        this.vfs.getMountPathInfo(path, getConf());
+    checkDFS(mountPathInfo.getTargetFs(), "getFileEncryptionInfo");
+    return ((DistributedFileSystem) mountPathInfo.getTargetFs())
+        .getFileEncryptionInfo(mountPathInfo.getPathOnTarget());
+  }
+
+  @Override
+  public void provisionEZTrash(final Path path,
+      final FsPermission trashPermission) throws IOException {
+    if (this.vfs == null) {
+      super.provisionEZTrash(path, trashPermission);
+      return;
+    }
+    ViewFileSystemOverloadScheme.MountPathInfo<FileSystem> mountPathInfo =
+        this.vfs.getMountPathInfo(path, getConf());
+    checkDFS(mountPathInfo.getTargetFs(), "provisionEZTrash");
+    ((DistributedFileSystem) mountPathInfo.getTargetFs())
+        .provisionEZTrash(mountPathInfo.getPathOnTarget(), trashPermission);
+  }
+
+  @Override
+  public void setXAttr(Path path, String name, byte[] value,
+      EnumSet<XAttrSetFlag> flag) throws IOException {
+    if (this.vfs == null) {
+      super.setXAttr(path, name, value, flag);
+      return;
+    }
+    ViewFileSystemOverloadScheme.MountPathInfo<FileSystem> mountPathInfo =
+        this.vfs.getMountPathInfo(path, getConf());
+    mountPathInfo.getTargetFs()
+        .setXAttr(mountPathInfo.getPathOnTarget(), name, value, flag);
+  }
+
+  @Override
+  public byte[] getXAttr(Path path, String name) throws IOException {
+    if (this.vfs == null) {
+      return super.getXAttr(path, name);
+    }
+    ViewFileSystemOverloadScheme.MountPathInfo<FileSystem> mountPathInfo =
+        this.vfs.getMountPathInfo(path, getConf());
+    return mountPathInfo.getTargetFs()
+        .getXAttr(mountPathInfo.getPathOnTarget(), name);
+  }
+
+  @Override
+  public Map<String, byte[]> getXAttrs(Path path) throws IOException {
+    if (this.vfs == null) {
+      return super.getXAttrs(path);
+    }
+    ViewFileSystemOverloadScheme.MountPathInfo<FileSystem> mountPathInfo =
+        this.vfs.getMountPathInfo(path, getConf());
+    return mountPathInfo.getTargetFs()
+        .getXAttrs(mountPathInfo.getPathOnTarget());
+  }
+
+  @Override
+  public Map<String, byte[]> getXAttrs(Path path, List<String> names)
+      throws IOException {
+    if (this.vfs == null) {
+      return super.getXAttrs(path, names);
+    }
+    ViewFileSystemOverloadScheme.MountPathInfo<FileSystem> mountPathInfo =
+        this.vfs.getMountPathInfo(path, getConf());
+    return mountPathInfo.getTargetFs()
+        .getXAttrs(mountPathInfo.getPathOnTarget(), names);
+  }
+
+  @Override
+  public List<String> listXAttrs(Path path) throws IOException {
+    if (this.vfs == null) {
+      return super.listXAttrs(path);
+    }
+    ViewFileSystemOverloadScheme.MountPathInfo<FileSystem> mountPathInfo =
+        this.vfs.getMountPathInfo(path, getConf());
+    return mountPathInfo.getTargetFs()
+        .listXAttrs(mountPathInfo.getPathOnTarget());
+  }
+
+  @Override
+  public void removeXAttr(Path path, String name) throws IOException {
+    if (this.vfs == null) {
+      super.removeXAttr(path, name);
+      return;
+    }
+    ViewFileSystemOverloadScheme.MountPathInfo<FileSystem> mountPathInfo =
+        this.vfs.getMountPathInfo(path, getConf());
+    mountPathInfo.getTargetFs()
+        .removeXAttr(mountPathInfo.getPathOnTarget(), name);
+  }
+
+  @Override
+  public void access(Path path, FsAction mode)
+      throws AccessControlException, FileNotFoundException, IOException {
+    if (this.vfs == null) {
+      super.access(path, mode);
+      return;
+    }
+    this.vfs.access(path, mode);
+  }
+
+  @Override
+  public URI getKeyProviderUri() throws IOException {
+    if (this.vfs == null) {
+      return super.getKeyProviderUri();
+    }
+    checkDefaultDFS(defaultDFS, "getKeyProviderUri");
+    return defaultDFS.getKeyProviderUri();
+  }
+
+  @Override
+  public KeyProvider getKeyProvider() throws IOException {
+    if (this.vfs == null) {
+      return super.getKeyProvider();
+    }
+    checkDefaultDFS(defaultDFS, "getKeyProvider");
+    return defaultDFS.getKeyProvider();
+  }
+
+  @Override
+  public DelegationTokenIssuer[] getAdditionalTokenIssuers()
+      throws IOException {
+    if (this.vfs == null) {
+      return super.getChildFileSystems();
+    }
+
+    return this.vfs.getChildFileSystems();
+  }
+
+  @Override
+  public DFSInotifyEventInputStream getInotifyEventStream() throws IOException {
+    if (this.vfs == null) {
+      return super.getInotifyEventStream();
+    }
+    checkDefaultDFS(defaultDFS, "getInotifyEventStream");
+    return defaultDFS.getInotifyEventStream();
+  }
+
+  @Override
+  public DFSInotifyEventInputStream getInotifyEventStream(long lastReadTxid)
+      throws IOException {
+    if (this.vfs == null) {
+      return super.getInotifyEventStream();
+    }
+    checkDefaultDFS(defaultDFS, "getInotifyEventStream");
+    return defaultDFS.getInotifyEventStream();
+  }
+
+  @Override
+  // DFS only API.
+  public void setErasureCodingPolicy(final Path path, final String ecPolicyName)
+      throws IOException {
+    if (this.vfs == null) {
+      super.setErasureCodingPolicy(path, ecPolicyName);
+      return;
+    }
+
+    ViewFileSystemOverloadScheme.MountPathInfo<FileSystem> mountPathInfo =
+        this.vfs.getMountPathInfo(path, getConf());
+    checkDFS(mountPathInfo.getTargetFs(), "setErasureCodingPolicy");
+    ((DistributedFileSystem) mountPathInfo.getTargetFs())
+        .setErasureCodingPolicy(mountPathInfo.getPathOnTarget(), ecPolicyName);
+  }
+
+  @Override
+  public void satisfyStoragePolicy(Path src) throws IOException {
+    if (this.vfs == null) {
+      super.satisfyStoragePolicy(src);
+      return;
+    }
+    this.vfs.satisfyStoragePolicy(src);
+  }
+
+  @Override
+  public ErasureCodingPolicy getErasureCodingPolicy(final Path path)
+      throws IOException {
+    if (this.vfs == null) {
+      return super.getErasureCodingPolicy(path);
+    }
+
+    ViewFileSystemOverloadScheme.MountPathInfo<FileSystem> mountPathInfo =
+        this.vfs.getMountPathInfo(path, getConf());
+    checkDFS(mountPathInfo.getTargetFs(), "getErasureCodingPolicy");
+    return ((DistributedFileSystem) mountPathInfo.getTargetFs())
+        .getErasureCodingPolicy(mountPathInfo.getPathOnTarget());
+  }
+
+  /**
+   * Gets all erasure coding policies from all available child file systems.
+   */
+  @Override
+  public Collection<ErasureCodingPolicyInfo> getAllErasureCodingPolicies()
+      throws IOException {
+    if (this.vfs == null) {
+      return super.getAllErasureCodingPolicies();
+    }
+    FileSystem[] childFss = getChildFileSystems();
+    List<ErasureCodingPolicyInfo> results = new ArrayList<>();
+    List<IOException> failedExceptions = new ArrayList<>();
+    boolean isDFSExistsInChilds = false;
+    for (FileSystem fs : childFss) {
+      if (!(fs instanceof DistributedFileSystem)) {
+        continue;
+      }
+      isDFSExistsInChilds = true;
+      DistributedFileSystem dfs = (DistributedFileSystem) fs;
+      try {
+        results.addAll(dfs.getAllErasureCodingPolicies());
+      } catch (IOException ioe) {
+        failedExceptions.add(ioe);
+      }
+    }
+
+    if (!isDFSExistsInChilds) {
+      throw new UnsupportedOperationException(
+          "No DFS available in child file systems.");
+    }
+    if (failedExceptions.size() > 0) {
+      throw MultipleIOException.createIOException(failedExceptions);
+    }
+    return results;
+  }
+
+  @Override
+  public Map<String, String> getAllErasureCodingCodecs() throws IOException {
+    if (this.vfs == null) {
+      return super.getAllErasureCodingCodecs();
+    }
+    FileSystem[] childFss = getChildFileSystems();
+    Map<String, String> results = new HashMap<>();
+    List<IOException> failedExceptions = new ArrayList<>();
+    boolean isDFSExistsInChilds = false;
+    for (FileSystem fs : childFss) {
+      if (!(fs instanceof DistributedFileSystem)) {
+        continue;
+      }
+      isDFSExistsInChilds = true;
+      DistributedFileSystem dfs = (DistributedFileSystem) fs;
+      try {
+        results.putAll(dfs.getAllErasureCodingCodecs());
+      } catch (IOException ioe) {
+        failedExceptions.add(ioe);
+      }
+    }
+    if (!isDFSExistsInChilds) {
+      throw new UnsupportedOperationException(
+          "No DFS available in child file systems.");
+    }
+    if (failedExceptions.size() > 0) {
+      throw MultipleIOException.createIOException(failedExceptions);
+    }
+    return results;
+  }
+
+  @Override
+  public AddErasureCodingPolicyResponse[] addErasureCodingPolicies(
+      ErasureCodingPolicy[] policies) throws IOException {
+    if (this.vfs == null) {
+      return super.addErasureCodingPolicies(policies);
+    }
+    List<IOException> failedExceptions = new ArrayList<>();
+    List<AddErasureCodingPolicyResponse> results = new ArrayList<>();
+    boolean isDFSExistsInChilds = false;
+    for (FileSystem fs : getChildFileSystems()) {
+      if (!(fs instanceof DistributedFileSystem)) {
+        continue;
+      }
+      isDFSExistsInChilds = true;
+      DistributedFileSystem dfs = (DistributedFileSystem) fs;
+      try {
+        results.addAll(Arrays.asList(dfs.addErasureCodingPolicies(policies)));
+      } catch (IOException ioe) {
+        failedExceptions.add(ioe);
+      }
+    }
+    if (!isDFSExistsInChilds) {
+      throw new UnsupportedOperationException(
+          "No DFS available in child file systems.");
+    }
+    if (failedExceptions.size() > 0) {
+      throw MultipleIOException.createIOException(failedExceptions);
+    }
+    return results.toArray(new AddErasureCodingPolicyResponse[results.size()]);
+  }
+
+  @Override
+  public void removeErasureCodingPolicy(String ecPolicyName)
+      throws IOException {
+    if (this.vfs == null) {
+      super.removeErasureCodingPolicy(ecPolicyName);
+      return;
+    }
+
+    List<IOException> failedExceptions = new ArrayList<>();
+    boolean isDFSExistsInChilds = false;
+
+    for (FileSystem fs : getChildFileSystems()) {
+      if (!(fs instanceof DistributedFileSystem)) {
+        continue;
+      }
+      isDFSExistsInChilds = true;
+      DistributedFileSystem dfs = (DistributedFileSystem) fs;
+      try {
+        dfs.removeErasureCodingPolicy(ecPolicyName);
+      } catch (IOException ioe) {
+        failedExceptions.add(ioe);
+      }
+    }
+    if (!isDFSExistsInChilds) {
+      throw new UnsupportedOperationException(
+          "No DFS available in child file systems.");
+    }
+    if (failedExceptions.size() > 0) {
+      throw MultipleIOException.createIOException(failedExceptions);
+    }
+  }
+
+  @Override
+  public void enableErasureCodingPolicy(String ecPolicyName)
+      throws IOException {
+    if (this.vfs == null) {
+      super.enableErasureCodingPolicy(ecPolicyName);
+      return;
+    }
+    List<IOException> failedExceptions = new ArrayList<>();
+    boolean isDFSExistsInChilds = false;
+
+    for (FileSystem fs : getChildFileSystems()) {
+      if (!(fs instanceof DistributedFileSystem)) {
+        continue;
+      }
+      isDFSExistsInChilds = true;
+      DistributedFileSystem dfs = (DistributedFileSystem) fs;
+      try {
+        dfs.enableErasureCodingPolicy(ecPolicyName);
+      } catch (IOException ioe) {
+        failedExceptions.add(ioe);
+      }
+    }
+    if (!isDFSExistsInChilds) {
+      throw new UnsupportedOperationException(
+          "No DFS available in child file systems.");
+    }
+    if (failedExceptions.size() > 0) {
+      throw MultipleIOException.createIOException(failedExceptions);
+    }
+  }
+
+  @Override
+  public void disableErasureCodingPolicy(String ecPolicyName)
+      throws IOException {
+    if (this.vfs == null) {
+      super.disableErasureCodingPolicy(ecPolicyName);
+      return;
+    }
+    List<IOException> failedExceptions = new ArrayList<>();
+    boolean isDFSExistsInChilds = false;
+
+    for (FileSystem fs : getChildFileSystems()) {
+      if (!(fs instanceof DistributedFileSystem)) {
+        continue;
+      }
+      isDFSExistsInChilds = true;
+      DistributedFileSystem dfs = (DistributedFileSystem) fs;
+      try {
+        dfs.disableErasureCodingPolicy(ecPolicyName);
+      } catch (IOException ioe) {
+        failedExceptions.add(ioe);
+      }
+    }
+    if (!isDFSExistsInChilds) {
+      throw new UnsupportedOperationException(
+          "No DFS available in child file systems.");
+    }
+    if (failedExceptions.size() > 0) {
+      throw MultipleIOException.createIOException(failedExceptions);
+    }
+  }
+
+  @Override
+  public void unsetErasureCodingPolicy(final Path path) throws IOException {
+    if (this.vfs == null) {
+      super.unsetErasureCodingPolicy(path);
+      return;
+    }
+
+    ViewFileSystemOverloadScheme.MountPathInfo<FileSystem> mountPathInfo =
+        this.vfs.getMountPathInfo(path, getConf());
+    checkDFS(mountPathInfo.getTargetFs(), "unsetErasureCodingPolicy");
+    ((DistributedFileSystem) mountPathInfo.getTargetFs())
+        .unsetErasureCodingPolicy(mountPathInfo.getPathOnTarget());
+  }
+
+  @Override
+  public ECTopologyVerifierResult getECTopologyResultForPolicies(
+      final String... policyNames) throws IOException {
+    if (this.vfs == null) {
+      return super.getECTopologyResultForPolicies(policyNames);
+    }
+
+    List<IOException> failedExceptions = new ArrayList<>();
+    ECTopologyVerifierResult result = null;
+    for (FileSystem fs : getChildFileSystems()) {
+      if (!(fs instanceof DistributedFileSystem)) {
+        continue;
+      }
+      DistributedFileSystem dfs = (DistributedFileSystem) fs;
+      try {
+        result = dfs.getECTopologyResultForPolicies(policyNames);
+        if (!result.isSupported()) {
+          // whenever we see negative result.
+          return result;
+        }
+      } catch (IOException ioe) {
+        failedExceptions.add(ioe);
+      }
+    }
+    if (result == null) {
+      throw new UnsupportedOperationException(
+          "No DFS available in child filesystems");
+    }
+    if (failedExceptions.size() > 0) {
+      throw MultipleIOException.createIOException(failedExceptions);
+    }
+    // Let's just return the last one.
+    return result;
+  }
+
+  @Override
+  public Path getTrashRoot(Path path) {
+    if (this.vfs == null) {
+      return super.getTrashRoot(path);
+    }
+    return this.vfs.getTrashRoot(path);
+  }
+
+  @Override
+  public Collection<FileStatus> getTrashRoots(boolean allUsers) {
+    if (this.vfs == null) {
+      return super.getTrashRoots(allUsers);
+    }
+    List<FileStatus> trashRoots = new ArrayList<>();
+    for (FileSystem fs : getChildFileSystems()) {
+      trashRoots.addAll(fs.getTrashRoots(allUsers));
+    }
+    return trashRoots;
+  }
+
+  // Just proovided the same implementation as default in dfs as thats just
+  // delegated to FileSystem parent class.
+  @Override
+  protected Path fixRelativePart(Path p) {
+    return super.fixRelativePart(p);
+  }
+
+  Statistics getFsStatistics() {
+    if (this.vfs == null) {
+      return super.getFsStatistics();
+    }
+    return statistics;
+  }
+
+  DFSOpsCountStatistics getDFSOpsCountStatistics() {
+    if (this.vfs == null) {
+      return super.getDFSOpsCountStatistics();
+    }
+    return defaultDFS.getDFSOpsCountStatistics();
+  }
+
+  @Override
+  // Works only for HDFS
+  public HdfsDataOutputStreamBuilder createFile(Path path) {
+    if (this.vfs == null) {
+      return super.createFile(path);
+    }
+    ViewFileSystemOverloadScheme.MountPathInfo<FileSystem> mountPathInfo = null;
+    try {
+      mountPathInfo = this.vfs.getMountPathInfo(path, getConf());
+    } catch (IOException e) {
+      // TODO: can we return null here?
+      return null;
+    }
+    checkDFS(mountPathInfo.getTargetFs(), "createFile");
+    return (HdfsDataOutputStreamBuilder) mountPathInfo.getTargetFs()
+        .createFile(mountPathInfo.getPathOnTarget());
+  }
+
+  @Deprecated
+  @Override
+  public RemoteIterator<OpenFileEntry> listOpenFiles() throws IOException {
+    if (this.vfs == null) {
+      return super.listOpenFiles();
+    }
+    checkDefaultDFS(defaultDFS, "listOpenFiles");
+    return defaultDFS.listOpenFiles();
+  }
+
+  @Deprecated
+  @Override
+  public RemoteIterator<OpenFileEntry> listOpenFiles(
+      EnumSet<OpenFilesIterator.OpenFilesType> openFilesTypes)
+      throws IOException {
+    if (this.vfs == null) {
+      return super.listOpenFiles(openFilesTypes);
+    }
+    checkDefaultDFS(defaultDFS, "listOpenFiles");
+    return defaultDFS.listOpenFiles(openFilesTypes);
+  }
+
+  @Override
+  public RemoteIterator<OpenFileEntry> listOpenFiles(
+      EnumSet<OpenFilesIterator.OpenFilesType> openFilesTypes, String path)
+      throws IOException {
+    if (this.vfs == null) {
+      return super.listOpenFiles(openFilesTypes, path);
+    }
+    Path absF = fixRelativePart(new Path(path));
+    ViewFileSystemOverloadScheme.MountPathInfo<FileSystem> mountPathInfo =
+        this.vfs.getMountPathInfo(absF, getConf());
+    checkDFS(mountPathInfo.getTargetFs(), "listOpenFiles");
+    return ((DistributedFileSystem) mountPathInfo.getTargetFs())
+        .listOpenFiles(openFilesTypes,
+            mountPathInfo.getPathOnTarget().toString());
+  }
+
+  @Override
+  public HdfsDataOutputStreamBuilder appendFile(Path path) {
+    if (this.vfs == null) {
+      return super.appendFile(path);
+    }
+    ViewFileSystemOverloadScheme.MountPathInfo<FileSystem> mountPathInfo = null;
+    try {
+      mountPathInfo = this.vfs.getMountPathInfo(path, getConf());
+    } catch (IOException e) {
+      LOGGER.warn("Failed to resolve the path as mount path", e);
+      return null;
+    }
+    checkDFS(mountPathInfo.getTargetFs(), "appendFile");
+    return (HdfsDataOutputStreamBuilder) mountPathInfo.getTargetFs()
+        .appendFile(mountPathInfo.getPathOnTarget());
+  }
+
+  @Override
+  public boolean hasPathCapability(Path path, String capability)
+      throws IOException {
+    if (this.vfs == null) {
+      return super.hasPathCapability(path, capability);
+    }
+    return this.vfs.hasPathCapability(path, capability);
+  }
+
+  //Below API provided implementations are in ViewFS but not there in DFS.
+  @Override
+  public Path resolvePath(final Path f) throws IOException {
+    if (this.vfs == null) {
+      return super.resolvePath(f);
+    }
+    return this.vfs.resolvePath(f);
+  }
+
+  @Override
+  @SuppressWarnings("deprecation")
+  public boolean delete(final Path f)
+      throws AccessControlException, FileNotFoundException, IOException {
+    if (this.vfs == null) {
+      return super.delete(f);
+    }
+    return this.vfs.delete(f);
+  }
+
+  @Override
+  public FileChecksum getFileChecksum(final Path f, final long length)
+      throws AccessControlException, FileNotFoundException, IOException {
+    if (this.vfs == null) {
+      return super.getFileChecksum(f, length);
+    }
+    return this.vfs.getFileChecksum(f, length);
+  }
+
+  @Override
+  public boolean mkdirs(Path dir) throws IOException {
+    if (this.vfs == null) {
+      return super.mkdirs(dir);
+    }
+    return this.vfs.mkdirs(dir);
+  }
+
+  @Override
+  public long getDefaultBlockSize(Path f) {
+    if (this.vfs == null) {
+      return super.getDefaultBlockSize(f);
+    }
+    return this.vfs.getDefaultBlockSize(f);
+  }
+
+  @Override
+  public short getDefaultReplication(Path f) {
+    if (this.vfs == null) {
+      return super.getDefaultReplication(f);
+    }
+    return this.vfs.getDefaultReplication(f);
+  }
+
+  @Override
+  public FsServerDefaults getServerDefaults(Path f) throws IOException {
+    if (this.vfs == null) {
+      return super.getServerDefaults(f);
+    }
+    return this.vfs.getServerDefaults(f);
+  }
+
+  @Override
+  public void setWriteChecksum(final boolean writeChecksum) {
+    if (this.vfs == null) {
+      super.setWriteChecksum(writeChecksum);
+      return;
+    }
+    this.vfs.setWriteChecksum(writeChecksum);
+  }
+
+  @Override
+  public FileSystem[] getChildFileSystems() {
+    if (this.vfs == null) {
+      return super.getChildFileSystems();
+    }
+    return this.vfs.getChildFileSystems();
+  }
+
+  public ViewFileSystem.MountPoint[] getMountPoints() {
+    if (this.vfs == null) {
+      return null;
+    }
+    return this.vfs.getMountPoints();
+  }
+
+  @Override
+  public FsStatus getStatus() throws IOException {
+    if (this.vfs == null) {
+      return super.getStatus();
+    }
+    return this.vfs.getStatus();
+  }
+
+  @Override
+  public long getUsed() throws IOException {
+    if (this.vfs == null) {
+      return super.getUsed();
+    }
+    return this.vfs.getUsed();
+  }
+}

+ 0 - 6
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNode.java

@@ -41,7 +41,6 @@ import org.apache.hadoop.ha.ServiceFailedException;
 import org.apache.hadoop.hdfs.DFSConfigKeys;
 import org.apache.hadoop.hdfs.DFSUtil;
 import org.apache.hadoop.hdfs.DFSUtilClient;
-import org.apache.hadoop.hdfs.DistributedFileSystem;
 import org.apache.hadoop.hdfs.HAUtil;
 import org.apache.hadoop.hdfs.HdfsConfiguration;
 import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys;
@@ -727,11 +726,6 @@ public class NameNode extends ReconfigurableBase implements
           intervals);
       }
     }
-    // Currently NN uses FileSystem.get to initialize DFS in startTrashEmptier.
-    // If fs.hdfs.impl was overridden by core-site.xml, we may get other
-    // filesystem. To make sure we get DFS, we are setting fs.hdfs.impl to DFS.
-    // HDFS-15450
-    conf.set(FS_HDFS_IMPL_KEY, DistributedFileSystem.class.getName());
 
     UserGroupInformation.setConfiguration(conf);
     loginAsNameNodeUser(conf);

+ 2 - 2
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/fs/viewfs/TestViewFSOverloadSchemeWithMountTableConfigInHDFS.java

@@ -37,8 +37,8 @@ public class TestViewFSOverloadSchemeWithMountTableConfigInHDFS
 
   @Before
   @Override
-  public void startCluster() throws IOException {
-    super.startCluster();
+  public void setUp() throws IOException {
+    super.setUp();
     String mountTableDir =
         URI.create(getConf().get(CommonConfigurationKeys.FS_DEFAULT_NAME_KEY))
             .toString() + "/MountTable/";

+ 94 - 48
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/fs/viewfs/TestViewFileSystemOverloadSchemeWithHdfsScheme.java

@@ -36,13 +36,15 @@ import org.apache.hadoop.fs.FsConstants;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.RawLocalFileSystem;
 import org.apache.hadoop.fs.UnsupportedFileSystemException;
-import org.apache.hadoop.hdfs.DFSConfigKeys;
 import org.apache.hadoop.hdfs.DistributedFileSystem;
 import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.test.LambdaTestUtils;
 import org.apache.hadoop.test.PathUtils;
 import org.junit.After;
+import org.junit.AfterClass;
 import org.junit.Assert;
 import org.junit.Before;
+import org.junit.BeforeClass;
 import org.junit.Test;
 
 import static org.apache.hadoop.fs.viewfs.Constants.CONFIG_VIEWFS_IGNORE_PORT_IN_MOUNT_TABLE_NAME;
@@ -58,7 +60,7 @@ public class TestViewFileSystemOverloadSchemeWithHdfsScheme {
   private static final String FS_IMPL_PATTERN_KEY = "fs.%s.impl";
   private static final String HDFS_SCHEME = "hdfs";
   private Configuration conf = null;
-  private MiniDFSCluster cluster = null;
+  private static MiniDFSCluster cluster = null;
   private URI defaultFSURI;
   private File localTargetDir;
   private static final String TEST_ROOT_DIR = PathUtils
@@ -66,33 +68,52 @@ public class TestViewFileSystemOverloadSchemeWithHdfsScheme {
   private static final String HDFS_USER_FOLDER = "/HDFSUser";
   private static final String LOCAL_FOLDER = "/local";
 
+  @BeforeClass
+  public static void init() throws IOException {
+    cluster =
+        new MiniDFSCluster.Builder(new Configuration()).numDataNodes(2).build();
+    cluster.waitClusterUp();
+  }
+
   /**
    * Sets up the configurations and starts the MiniDFSCluster.
    */
   @Before
-  public void startCluster() throws IOException {
-    conf = new Configuration();
-    conf.setBoolean(DFSConfigKeys.DFS_NAMENODE_DELEGATION_TOKEN_ALWAYS_USE_KEY,
-        true);
-    conf.setInt(
+  public void setUp() throws IOException {
+    Configuration config = getNewConf();
+    config.setInt(
         CommonConfigurationKeysPublic.IPC_CLIENT_CONNECT_MAX_RETRIES_KEY, 1);
-    conf.set(String.format(FS_IMPL_PATTERN_KEY, HDFS_SCHEME),
+    config.set(String.format(FS_IMPL_PATTERN_KEY, HDFS_SCHEME),
         ViewFileSystemOverloadScheme.class.getName());
-    conf.set(String.format(
-        FsConstants.FS_VIEWFS_OVERLOAD_SCHEME_TARGET_FS_IMPL_PATTERN,
-        HDFS_SCHEME), DistributedFileSystem.class.getName());
-    conf.setBoolean(CONFIG_VIEWFS_IGNORE_PORT_IN_MOUNT_TABLE_NAME,
+    config.setBoolean(CONFIG_VIEWFS_IGNORE_PORT_IN_MOUNT_TABLE_NAME,
         CONFIG_VIEWFS_IGNORE_PORT_IN_MOUNT_TABLE_NAME_DEFAULT);
-    cluster = new MiniDFSCluster.Builder(conf).numDataNodes(2).build();
-    cluster.waitClusterUp();
+    setConf(config);
     defaultFSURI =
-        URI.create(conf.get(CommonConfigurationKeys.FS_DEFAULT_NAME_KEY));
+        URI.create(config.get(CommonConfigurationKeys.FS_DEFAULT_NAME_KEY));
     localTargetDir = new File(TEST_ROOT_DIR, "/root/");
+    localTargetDir.mkdirs();
     Assert.assertEquals(HDFS_SCHEME, defaultFSURI.getScheme()); // hdfs scheme.
   }
 
   @After
-  public void tearDown() throws IOException {
+  public void cleanUp() throws IOException {
+    if (cluster != null) {
+      FileSystem fs = new DistributedFileSystem();
+      fs.initialize(defaultFSURI, conf);
+      try {
+        FileStatus[] statuses = fs.listStatus(new Path("/"));
+        for (FileStatus st : statuses) {
+          Assert.assertTrue(fs.delete(st.getPath(), true));
+        }
+      } finally {
+        fs.close();
+      }
+      FileSystem.closeAll();
+    }
+  }
+
+  @AfterClass
+  public static void tearDown() throws IOException {
     if (cluster != null) {
       FileSystem.closeAll();
       cluster.shutdown();
@@ -132,9 +153,9 @@ public class TestViewFileSystemOverloadSchemeWithHdfsScheme {
     // /local/test
     Path localDir = new Path(LOCAL_FOLDER + "/test");
 
-    try (ViewFileSystemOverloadScheme fs
-        = (ViewFileSystemOverloadScheme) FileSystem.get(conf)) {
-      Assert.assertEquals(2, fs.getMountPoints().length);
+    try (FileSystem fs
+        =  FileSystem.get(conf)) {
+      Assert.assertEquals(2, fs.getChildFileSystems().length);
       fs.createNewFile(hdfsFile); // /HDFSUser/testfile
       fs.mkdirs(localDir); // /local/test
     }
@@ -166,8 +187,13 @@ public class TestViewFileSystemOverloadSchemeWithHdfsScheme {
    * hdfs://localhost:xxx/HDFSUser --> nonexistent://NonExistent/User/
    * It should fail to add non existent fs link.
    */
-  @Test(expected = IOException.class, timeout = 30000)
+  @Test(timeout = 30000)
   public void testMountLinkWithNonExistentLink() throws Exception {
+    testMountLinkWithNonExistentLink(true);
+  }
+
+  public void testMountLinkWithNonExistentLink(boolean expectFsInitFailure)
+      throws Exception {
     final String userFolder = "/User";
     final Path nonExistTargetPath =
         new Path("nonexistent://NonExistent" + userFolder);
@@ -176,10 +202,17 @@ public class TestViewFileSystemOverloadSchemeWithHdfsScheme {
      * Below addLink will create following mount points
      * hdfs://localhost:xxx/User --> nonexistent://NonExistent/User/
      */
-    addMountLinks(defaultFSURI.getAuthority(), new String[] {userFolder },
-        new String[] {nonExistTargetPath.toUri().toString() }, conf);
-    FileSystem.get(conf);
-    Assert.fail("Expected to fail with non existent link");
+    addMountLinks(defaultFSURI.getAuthority(), new String[] {userFolder},
+        new String[] {nonExistTargetPath.toUri().toString()}, conf);
+    if (expectFsInitFailure) {
+      LambdaTestUtils.intercept(IOException.class, () -> {
+        FileSystem.get(conf);
+      });
+    } else {
+      try (FileSystem fs = FileSystem.get(conf)) {
+        Assert.assertEquals("hdfs", fs.getScheme());
+      }
+    }
   }
 
   /**
@@ -271,14 +304,10 @@ public class TestViewFileSystemOverloadSchemeWithHdfsScheme {
 
     // check for viewfs path without authority
     Path viewFsRootPath = new Path("viewfs:/");
-    try {
-      viewFsRootPath.getFileSystem(conf);
-      Assert.fail(
-          "Mount table with authority default should not be initialized");
-    } catch (IOException e) {
-      assertTrue(e.getMessage().contains(
-          "Empty Mount table in config for viewfs://default/"));
-    }
+    LambdaTestUtils.intercept(IOException.class,
+        "Empty Mount table in config for viewfs://default", () -> {
+          viewFsRootPath.getFileSystem(conf);
+        });
 
     // set the name of the default mount table here and
     // subsequent calls should succeed.
@@ -334,18 +363,25 @@ public class TestViewFileSystemOverloadSchemeWithHdfsScheme {
    *
    * It cannot find any mount link. ViewFS expects a mount point from root.
    */
-  @Test(expected = NotInMountpointException.class, timeout = 30000)
-  public void testCreateOnRootShouldFailWhenMountLinkConfigured()
-      throws Exception {
+  @Test(timeout = 30000)
+  public void testCreateOnRoot() throws Exception {
+    testCreateOnRoot(false);
+  }
+
+  public void testCreateOnRoot(boolean fallbackExist) throws Exception {
     final Path hdfsTargetPath = new Path(defaultFSURI + HDFS_USER_FOLDER);
     addMountLinks(defaultFSURI.getAuthority(),
-        new String[] {HDFS_USER_FOLDER, LOCAL_FOLDER },
+        new String[] {HDFS_USER_FOLDER, LOCAL_FOLDER},
         new String[] {hdfsTargetPath.toUri().toString(),
-            localTargetDir.toURI().toString() },
-        conf);
+            localTargetDir.toURI().toString()}, conf);
     try (FileSystem fs = FileSystem.get(conf)) {
-      fs.createNewFile(new Path("/newFileOnRoot"));
-      Assert.fail("It should fail as root is read only in viewFS.");
+      if (fallbackExist) {
+        Assert.assertTrue(fs.createNewFile(new Path("/newFileOnRoot")));
+      } else {
+        LambdaTestUtils.intercept(NotInMountpointException.class, () -> {
+          fs.createNewFile(new Path("/newFileOnRoot"));
+        });
+      }
     }
   }
 
@@ -433,15 +469,13 @@ public class TestViewFileSystemOverloadSchemeWithHdfsScheme {
         conf);
 
     // 1. Only 1 hdfs child file system should be there with cache.
-    try (ViewFileSystemOverloadScheme vfs =
-        (ViewFileSystemOverloadScheme) FileSystem.get(conf)) {
+    try (FileSystem vfs = FileSystem.get(conf)) {
       Assert.assertEquals(1, vfs.getChildFileSystems().length);
     }
 
     // 2. Two hdfs file systems should be there if no cache.
     conf.setBoolean(Constants.CONFIG_VIEWFS_ENABLE_INNER_CACHE, false);
-    try (ViewFileSystemOverloadScheme vfs =
-        (ViewFileSystemOverloadScheme) FileSystem.get(conf)) {
+    try (FileSystem vfs = FileSystem.get(conf)) {
       Assert.assertEquals(2, vfs.getChildFileSystems().length);
     }
   }
@@ -466,8 +500,7 @@ public class TestViewFileSystemOverloadSchemeWithHdfsScheme {
 
     conf.setBoolean(Constants.CONFIG_VIEWFS_ENABLE_INNER_CACHE, false);
     // Two hdfs file systems should be there if no cache.
-    try (ViewFileSystemOverloadScheme vfs =
-        (ViewFileSystemOverloadScheme) FileSystem.get(conf)) {
+    try (FileSystem vfs = FileSystem.get(conf)) {
       Assert.assertEquals(2, vfs.getChildFileSystems().length);
     }
   }
@@ -494,8 +527,7 @@ public class TestViewFileSystemOverloadSchemeWithHdfsScheme {
     // Only one local file system should be there if no InnerCache, but fs
     // cache should work.
     conf.setBoolean(Constants.CONFIG_VIEWFS_ENABLE_INNER_CACHE, false);
-    try (ViewFileSystemOverloadScheme vfs =
-        (ViewFileSystemOverloadScheme) FileSystem.get(conf)) {
+    try (FileSystem vfs = FileSystem.get(conf)) {
       Assert.assertEquals(1, vfs.getChildFileSystems().length);
     }
   }
@@ -656,4 +688,18 @@ public class TestViewFileSystemOverloadSchemeWithHdfsScheme {
   public Configuration getConf() {
     return this.conf;
   }
+
+  /**
+   * @return configuration.
+   */
+  public Configuration getNewConf() {
+    return new Configuration(cluster.getConfiguration(0));
+  }
+
+  /**
+   * sets configuration.
+   */
+  public void setConf(Configuration config) {
+    conf = config;
+  }
 }

+ 14 - 14
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDistributedFileSystem.java

@@ -139,7 +139,7 @@ public class TestDistributedFileSystem {
   
   private boolean noXmlDefaults = false;
   
-  private HdfsConfiguration getTestConfiguration() {
+  HdfsConfiguration getTestConfiguration() {
     HdfsConfiguration conf;
     if (noXmlDefaults) {
       conf = new HdfsConfiguration(false);
@@ -810,7 +810,7 @@ public class TestDistributedFileSystem {
 
   @Test
   public void testStatistics2() throws IOException, NoSuchAlgorithmException {
-    HdfsConfiguration conf = new HdfsConfiguration();
+    HdfsConfiguration conf = getTestConfiguration();
     conf.set(DFSConfigKeys.DFS_STORAGE_POLICY_SATISFIER_MODE_KEY,
         StoragePolicySatisfierMode.EXTERNAL.toString());
     File tmpDir = GenericTestUtils.getTestDir(UUID.randomUUID().toString());
@@ -1457,7 +1457,7 @@ public class TestDistributedFileSystem {
 
   @Test(timeout=60000)
   public void testFileCloseStatus() throws IOException {
-    Configuration conf = new HdfsConfiguration();
+    Configuration conf = getTestConfiguration();
     MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).build();
     DistributedFileSystem fs = cluster.getFileSystem();
     try {
@@ -1477,7 +1477,7 @@ public class TestDistributedFileSystem {
 
   @Test
   public void testCreateWithStoragePolicy() throws Throwable {
-    Configuration conf = new HdfsConfiguration();
+    Configuration conf = getTestConfiguration();
     try (MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf)
         .storageTypes(
             new StorageType[] {StorageType.DISK, StorageType.ARCHIVE,
@@ -1516,7 +1516,7 @@ public class TestDistributedFileSystem {
 
   @Test(timeout=60000)
   public void testListFiles() throws IOException {
-    Configuration conf = new HdfsConfiguration();
+    Configuration conf = getTestConfiguration();
     MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).build();
     
     try {
@@ -1539,7 +1539,7 @@ public class TestDistributedFileSystem {
 
   @Test
   public void testListStatusOfSnapshotDirs() throws IOException {
-    MiniDFSCluster cluster = new MiniDFSCluster.Builder(new HdfsConfiguration())
+    MiniDFSCluster cluster = new MiniDFSCluster.Builder(getTestConfiguration())
         .build();
     try {
       DistributedFileSystem dfs = cluster.getFileSystem();
@@ -1559,7 +1559,7 @@ public class TestDistributedFileSystem {
   @Test(timeout=10000)
   public void testDFSClientPeerReadTimeout() throws IOException {
     final int timeout = 1000;
-    final Configuration conf = new HdfsConfiguration();
+    final Configuration conf = getTestConfiguration();
     conf.setInt(HdfsClientConfigKeys.DFS_CLIENT_SOCKET_TIMEOUT_KEY, timeout);
 
     // only need cluster to create a dfs client to get a peer
@@ -1593,7 +1593,7 @@ public class TestDistributedFileSystem {
 
   @Test(timeout=60000)
   public void testGetServerDefaults() throws IOException {
-    Configuration conf = new HdfsConfiguration();
+    Configuration conf = getTestConfiguration();
     MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).build();
     try {
       cluster.waitActive();
@@ -1608,7 +1608,7 @@ public class TestDistributedFileSystem {
   @Test(timeout=10000)
   public void testDFSClientPeerWriteTimeout() throws IOException {
     final int timeout = 1000;
-    final Configuration conf = new HdfsConfiguration();
+    final Configuration conf = getTestConfiguration();
     conf.setInt(HdfsClientConfigKeys.DFS_CLIENT_SOCKET_TIMEOUT_KEY, timeout);
 
     // only need cluster to create a dfs client to get a peer
@@ -1645,7 +1645,7 @@ public class TestDistributedFileSystem {
 
   @Test(timeout = 30000)
   public void testTotalDfsUsed() throws Exception {
-    Configuration conf = new HdfsConfiguration();
+    Configuration conf = getTestConfiguration();
     MiniDFSCluster cluster = null;
     try {
       cluster = new MiniDFSCluster.Builder(conf).numDataNodes(1).build();
@@ -1832,7 +1832,7 @@ public class TestDistributedFileSystem {
 
   @Test
   public void testSuperUserPrivilege() throws Exception {
-    HdfsConfiguration conf = new HdfsConfiguration();
+    HdfsConfiguration conf = getTestConfiguration();
     File tmpDir = GenericTestUtils.getTestDir(UUID.randomUUID().toString());
     final Path jksPath = new Path(tmpDir.toString(), "test.jks");
     conf.set(CommonConfigurationKeysPublic.HADOOP_SECURITY_KEY_PROVIDER_PATH,
@@ -1885,7 +1885,7 @@ public class TestDistributedFileSystem {
 
   @Test
   public void testListingStoragePolicyNonSuperUser() throws Exception {
-    HdfsConfiguration conf = new HdfsConfiguration();
+    HdfsConfiguration conf = getTestConfiguration();
     try (MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).build()) {
       cluster.waitActive();
       final DistributedFileSystem dfs = cluster.getFileSystem();
@@ -2037,7 +2037,7 @@ public class TestDistributedFileSystem {
   @Test
   public void testStorageFavouredNodes()
       throws IOException, InterruptedException, TimeoutException {
-    Configuration conf = new HdfsConfiguration();
+    Configuration conf = getTestConfiguration();
     try (MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf)
         .storageTypes(new StorageType[] {StorageType.SSD, StorageType.DISK})
         .numDataNodes(3).storagesPerDatanode(2).build()) {
@@ -2062,7 +2062,7 @@ public class TestDistributedFileSystem {
 
   @Test
   public void testGetECTopologyResultForPolicies() throws Exception {
-    Configuration conf = new HdfsConfiguration();
+    Configuration conf = getTestConfiguration();
     try (MiniDFSCluster cluster = DFSTestUtil.setupCluster(conf, 9, 3, 0)) {
       DistributedFileSystem dfs = cluster.getFileSystem();
       dfs.enableErasureCodingPolicy("RS-6-3-1024k");

+ 47 - 0
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestViewDistributedFileSystem.java

@@ -0,0 +1,47 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs;
+
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.hdfs.protocol.HdfsConstants;
+import org.apache.hadoop.test.Whitebox;
+
+import java.io.IOException;
+
+public class TestViewDistributedFileSystem extends TestDistributedFileSystem{
+  @Override
+  HdfsConfiguration getTestConfiguration() {
+    HdfsConfiguration conf = super.getTestConfiguration();
+    conf.set("fs.hdfs.impl", ViewDistributedFileSystem.class.getName());
+    return conf;
+  }
+
+  @Override
+  public void testStatistics() throws IOException {
+    FileSystem.getStatistics(HdfsConstants.HDFS_URI_SCHEME,
+        ViewDistributedFileSystem.class).reset();
+    @SuppressWarnings("unchecked")
+    ThreadLocal<FileSystem.Statistics.StatisticsData> data =
+        (ThreadLocal<FileSystem.Statistics.StatisticsData>) Whitebox
+            .getInternalState(FileSystem
+                .getStatistics(HdfsConstants.HDFS_URI_SCHEME,
+                    ViewDistributedFileSystem.class), "threadData");
+    data.set(null);
+    super.testStatistics();
+  }
+}

+ 94 - 0
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestViewDistributedFileSystemContract.java

@@ -0,0 +1,94 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.CommonConfigurationKeys;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.FileSystemContractBaseTest;
+import org.apache.hadoop.fs.viewfs.ConfigUtil;
+import org.apache.hadoop.security.AccessControlException;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.test.GenericTestUtils;
+import org.apache.hadoop.test.LambdaTestUtils;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Ignore;
+import org.junit.Test;
+
+import java.io.File;
+import java.io.IOException;
+import java.net.URI;
+
+public class TestViewDistributedFileSystemContract
+    extends TestHDFSFileSystemContract {
+  private static MiniDFSCluster cluster;
+  private static String defaultWorkingDirectory;
+  private static Configuration conf = new HdfsConfiguration();
+
+  @BeforeClass
+  public static void init() throws IOException {
+    final File basedir = GenericTestUtils.getRandomizedTestDir();
+    conf.set(CommonConfigurationKeys.FS_PERMISSIONS_UMASK_KEY,
+        FileSystemContractBaseTest.TEST_UMASK);
+    cluster = new MiniDFSCluster.Builder(conf, basedir)
+        .numDataNodes(2)
+        .build();
+    defaultWorkingDirectory =
+        "/user/" + UserGroupInformation.getCurrentUser().getShortUserName();
+  }
+
+  @Before
+  public void setUp() throws Exception {
+    conf.set("fs.hdfs.impl", ViewDistributedFileSystem.class.getName());
+    URI defaultFSURI =
+        URI.create(conf.get(CommonConfigurationKeys.FS_DEFAULT_NAME_KEY));
+    ConfigUtil.addLink(conf, defaultFSURI.getHost(), "/user",
+        defaultFSURI);
+    ConfigUtil.addLinkFallback(conf, defaultFSURI.getHost(),
+        defaultFSURI);
+    fs = FileSystem.get(conf);
+  }
+
+  @AfterClass
+  public static void tearDownAfter() throws Exception {
+    if (cluster != null) {
+      cluster.shutdown();
+      cluster = null;
+    }
+  }
+
+  @Override
+  protected String getDefaultWorkingDirectory() {
+    return defaultWorkingDirectory;
+  }
+
+  @Test
+  public void testRenameRootDirForbidden() throws Exception {
+    LambdaTestUtils.intercept(AccessControlException.class,
+        "InternalDir of ViewFileSystem is readonly", () -> {
+          super.testRenameRootDirForbidden();
+        });
+  }
+
+  @Ignore("Ignore this test until HDFS-15532")
+  @Override
+  public void testLSRootDir() throws Throwable {
+  }
+}

+ 64 - 0
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestViewDistributedFileSystemWithMountLinks.java

@@ -0,0 +1,64 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.CommonConfigurationKeys;
+import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.viewfs.ConfigUtil;
+import org.apache.hadoop.fs.viewfs.TestViewFileSystemOverloadSchemeWithHdfsScheme;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.net.URI;
+
+import static org.apache.hadoop.fs.viewfs.Constants.CONFIG_VIEWFS_IGNORE_PORT_IN_MOUNT_TABLE_NAME;
+import static org.apache.hadoop.fs.viewfs.Constants.CONFIG_VIEWFS_IGNORE_PORT_IN_MOUNT_TABLE_NAME_DEFAULT;
+
+public class TestViewDistributedFileSystemWithMountLinks extends
+    TestViewFileSystemOverloadSchemeWithHdfsScheme {
+  @Override
+  public void setUp() throws IOException {
+    super.setUp();
+    Configuration conf = getConf();
+    conf.setBoolean(DFSConfigKeys.DFS_NAMENODE_DELEGATION_TOKEN_ALWAYS_USE_KEY,
+        true);
+    conf.setInt(
+        CommonConfigurationKeysPublic.IPC_CLIENT_CONNECT_MAX_RETRIES_KEY, 1);
+    conf.set("fs.hdfs.impl",
+        ViewDistributedFileSystem.class.getName());
+    conf.setBoolean(CONFIG_VIEWFS_IGNORE_PORT_IN_MOUNT_TABLE_NAME,
+        CONFIG_VIEWFS_IGNORE_PORT_IN_MOUNT_TABLE_NAME_DEFAULT);
+    URI defaultFSURI =
+        URI.create(conf.get(CommonConfigurationKeys.FS_DEFAULT_NAME_KEY));
+    ConfigUtil.addLinkFallback(conf, defaultFSURI.getAuthority(),
+        new Path(defaultFSURI.toString()).toUri());
+    setConf(conf);
+  }
+
+  @Test(timeout = 30000)
+  public void testCreateOnRoot() throws Exception {
+    testCreateOnRoot(true);
+  }
+
+  @Test(timeout = 30000)
+  public void testMountLinkWithNonExistentLink() throws Exception {
+    testMountLinkWithNonExistentLink(false);
+  }
+}

+ 24 - 3
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestCacheDirectives.java

@@ -132,17 +132,23 @@ public class TestCacheDirectives {
     conf.setInt(DFSConfigKeys.DFS_NAMENODE_LIST_CACHE_POOLS_NUM_RESPONSES, 2);
     conf.setInt(DFSConfigKeys.DFS_NAMENODE_LIST_CACHE_DIRECTIVES_NUM_RESPONSES,
         2);
-
     return conf;
   }
 
+  /**
+   * @return the configuration.
+   */
+  Configuration getConf() {
+    return this.conf;
+  }
+
   @Before
   public void setup() throws Exception {
     conf = createCachingConf();
     cluster =
         new MiniDFSCluster.Builder(conf).numDataNodes(NUM_DATANODES).build();
     cluster.waitActive();
-    dfs = cluster.getFileSystem();
+    dfs = getDFS();
     proto = cluster.getNameNodeRpc();
     namenode = cluster.getNameNode();
     prevCacheManipulator = NativeIO.POSIX.getCacheManipulator();
@@ -150,6 +156,13 @@ public class TestCacheDirectives {
     BlockReaderTestUtil.enableHdfsCachingTracing();
   }
 
+  /**
+   * @return the dfs instance.
+   */
+  DistributedFileSystem getDFS() throws IOException {
+    return (DistributedFileSystem) FileSystem.get(conf);
+  }
+
   @After
   public void teardown() throws Exception {
     // Remove cache directives left behind by tests so that we release mmaps.
@@ -1613,6 +1626,14 @@ public class TestCacheDirectives {
             "testAddingCacheDirectiveInfosWhenCachingIsDisabled:2");
   }
 
+  /**
+   * @return the dfs instance for nnIdx.
+   */
+  DistributedFileSystem getDFS(MiniDFSCluster cluster, int nnIdx)
+      throws IOException {
+    return cluster.getFileSystem(0);
+  }
+
   @Test(timeout=120000)
   public void testExpiryTimeConsistency() throws Exception {
     conf.setInt(DFSConfigKeys.DFS_HA_LOGROLL_PERIOD_KEY, 1);
@@ -1623,7 +1644,7 @@ public class TestCacheDirectives {
             .build();
     dfsCluster.transitionToActive(0);
 
-    DistributedFileSystem fs = dfsCluster.getFileSystem(0);
+    DistributedFileSystem fs = getDFS(dfsCluster, 0);
     final NameNode ann = dfsCluster.getNameNode(0);
 
     final Path filename = new Path("/file");

+ 56 - 0
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestCacheDirectivesWithViewDFS.java

@@ -0,0 +1,56 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.namenode;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.CommonConfigurationKeys;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.viewfs.ConfigUtil;
+import org.apache.hadoop.hdfs.DistributedFileSystem;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.hdfs.ViewDistributedFileSystem;
+
+import java.io.IOException;
+import java.net.URI;
+
+public class TestCacheDirectivesWithViewDFS extends TestCacheDirectives {
+
+  @Override
+  public DistributedFileSystem getDFS() throws IOException {
+    Configuration conf = getConf();
+    conf.set("fs.hdfs.impl", ViewDistributedFileSystem.class.getName());
+    URI defaultFSURI =
+        URI.create(conf.get(CommonConfigurationKeys.FS_DEFAULT_NAME_KEY));
+    ConfigUtil.addLinkFallback(conf, defaultFSURI.getHost(),
+        new Path(defaultFSURI.toString()).toUri());
+    ConfigUtil.addLink(conf, defaultFSURI.getHost(), "/tmp",
+        new Path(defaultFSURI.toString()).toUri());
+    return super.getDFS();
+  }
+
+  @Override
+  public DistributedFileSystem getDFS(MiniDFSCluster cluster, int nnIdx)
+      throws IOException {
+    Configuration conf = cluster.getConfiguration(nnIdx);
+    conf.set("fs.hdfs.impl", ViewDistributedFileSystem.class.getName());
+    URI uri = cluster.getURI(0);
+    ConfigUtil.addLinkFallback(conf, uri.getHost(), uri);
+    ConfigUtil.addLink(conf, uri.getHost(), "/tmp", uri);
+    return cluster.getFileSystem(0);
+  }
+}