Jelajahi Sumber

HDFS-16791. Add getEnclosingRoot() API to filesystem interface and implementations (#6198)

The enclosing root path is a common ancestor that should be used for temp and staging dirs
as well as within encryption zones and other restricted directories.

Contributed by Tom McCormick
Tom 1 tahun lalu
induk
melakukan
e82d83ee33
29 mengubah file dengan 879 tambahan dan 33 penghapusan
  1. 18 0
      hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/AbstractFileSystem.java
  2. 18 0
      hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FileSystem.java
  3. 5 0
      hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FilterFileSystem.java
  4. 5 0
      hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FilterFs.java
  5. 37 0
      hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/viewfs/ViewFileSystem.java
  6. 17 0
      hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/viewfs/ViewFs.java
  7. 34 1
      hadoop-common-project/hadoop-common/src/site/markdown/filesystem/filesystem.md
  8. 94 0
      hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/TestGetEnclosingRoot.java
  9. 2 0
      hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/TestHarFileSystem.java
  10. 103 0
      hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/AbstractContractGetEnclosingRoot.java
  11. 32 0
      hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/localfs/TestLocalFSContractGetEnclosingRoot.java
  12. 32 0
      hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/rawlocal/TestRawlocalContractGetEnclosingRoot.java
  13. 10 0
      hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSClient.java
  14. 1 0
      hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSOpsCountStatistics.java
  15. 26 0
      hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DistributedFileSystem.java
  16. 8 0
      hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/ClientProtocol.java
  17. 18 0
      hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolTranslatorPB.java
  18. 10 0
      hadoop-hdfs-project/hadoop-hdfs-client/src/main/proto/ClientNamenodeProtocol.proto
  19. 1 0
      hadoop-hdfs-project/hadoop-hdfs-client/src/test/java/org/apache/hadoop/hdfs/protocol/TestReadOnly.java
  20. 33 0
      hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterClientProtocol.java
  21. 7 1
      hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcServer.java
  22. 25 0
      hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterMountTable.java
  23. 19 3
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolServerSideTranslatorPB.java
  24. 15 0
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
  25. 7 0
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java
  26. 121 28
      hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/fs/viewfs/TestViewFileSystemHdfs.java
  27. 27 0
      hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/DFSTestUtil.java
  28. 5 0
      hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDistributedFileSystem.java
  29. 149 0
      hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestEnclosingRoot.java

+ 18 - 0
hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/AbstractFileSystem.java

@@ -1639,6 +1639,24 @@ public abstract class AbstractFileSystem implements PathCapabilities {
     return null;
   }
 
+  /**
+   * Return path of the enclosing root for a given path
+   * The enclosing root path is a common ancestor that should be used for temp and staging dirs
+   * as well as within encryption zones and other restricted directories.
+   *
+   * Call makeQualified on the param path to ensure its part of the correct filesystem
+   *
+   * @param path file path to find the enclosing root path for
+   * @return a path to the enclosing root
+   * @throws IOException early checks like failure to resolve path cause IO failures
+   */
+  @InterfaceAudience.Public
+  @InterfaceStability.Unstable
+  public Path getEnclosingRoot(Path path) throws IOException {
+    makeQualified(path);
+    return makeQualified(new Path("/"));
+  }
+
   /**
    * Helper method that throws an {@link UnsupportedOperationException} for the
    * current {@link FileSystem} method being called.

+ 18 - 0
hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FileSystem.java

@@ -4920,6 +4920,24 @@ public abstract class FileSystem extends Configured
 
   }
 
+  /**
+   * Return path of the enclosing root for a given path.
+   * The enclosing root path is a common ancestor that should be used for temp and staging dirs
+   * as well as within encryption zones and other restricted directories.
+   *
+   * Call makeQualified on the param path to ensure its part of the correct filesystem.
+   *
+   * @param path file path to find the enclosing root path for
+   * @return a path to the enclosing root
+   * @throws IOException early checks like failure to resolve path cause IO failures
+   */
+  @InterfaceAudience.Public
+  @InterfaceStability.Unstable
+  public Path getEnclosingRoot(Path path) throws IOException {
+    this.makeQualified(path);
+    return this.makeQualified(new Path("/"));
+  }
+
   /**
    * Create a multipart uploader.
    * @param basePath file path under which all files are uploaded

+ 5 - 0
hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FilterFileSystem.java

@@ -732,6 +732,11 @@ public class FilterFileSystem extends FileSystem {
     return fs.openFileWithOptions(pathHandle, parameters);
   }
 
+  @Override
+  public Path getEnclosingRoot(Path path) throws IOException {
+    return fs.getEnclosingRoot(path);
+  }
+
   @Override
   public boolean hasPathCapability(final Path path, final String capability)
       throws IOException {

+ 5 - 0
hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FilterFs.java

@@ -459,4 +459,9 @@ public abstract class FilterFs extends AbstractFileSystem {
       throws IOException {
     return myFs.createMultipartUploader(basePath);
   }
+
+  @Override
+  public Path getEnclosingRoot(Path path) throws IOException {
+    return myFs.getEnclosingRoot(path);
+  }
 }

+ 37 - 0
hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/viewfs/ViewFileSystem.java

@@ -1368,6 +1368,24 @@ public class ViewFileSystem extends FileSystem {
     }
   }
 
+  @Override
+  public Path getEnclosingRoot(Path path) throws IOException {
+    InodeTree.ResolveResult<FileSystem> res;
+    try {
+      res = fsState.resolve(getUriPath(path), true);
+    } catch (FileNotFoundException ex) {
+      NotInMountpointException mountPointEx =
+          new NotInMountpointException(path,
+              String.format("getEnclosingRoot - %s", ex.getMessage()));
+      mountPointEx.initCause(ex);
+      throw mountPointEx;
+    }
+    Path mountPath = new Path(res.resolvedPath);
+    Path enclosingPath = res.targetFileSystem.getEnclosingRoot(new Path(getUriPath(path)));
+    return fixRelativePart(this.makeQualified(enclosingPath.depth() > mountPath.depth()
+        ?  enclosingPath : mountPath));
+  }
+
   /**
    * An instance of this class represents an internal dir of the viewFs
    * that is internal dir of the mount table.
@@ -1922,6 +1940,25 @@ public class ViewFileSystem extends FileSystem {
       }
       return allPolicies;
     }
+
+    @Override
+    public Path getEnclosingRoot(Path path) throws IOException {
+      InodeTree.ResolveResult<FileSystem> res;
+      try {
+        res = fsState.resolve((path.toString()), true);
+      } catch (FileNotFoundException ex) {
+        NotInMountpointException mountPointEx =
+            new NotInMountpointException(path,
+            String.format("getEnclosingRoot - %s", ex.getMessage()));
+        mountPointEx.initCause(ex);
+        throw mountPointEx;
+      }
+      Path fullPath = new Path(res.resolvedPath);
+      Path enclosingPath = res.targetFileSystem.getEnclosingRoot(path);
+      return enclosingPath.depth() > fullPath.depth()
+          ?  enclosingPath
+          : fullPath;
+    }
   }
 
   enum RenameStrategy {

+ 17 - 0
hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/viewfs/ViewFs.java

@@ -1479,5 +1479,22 @@ public class ViewFs extends AbstractFileSystem {
         throws IOException {
       throw readOnlyMountTable("setStoragePolicy", path);
     }
+
+    @Override
+    public Path getEnclosingRoot(Path path) throws IOException {
+      InodeTree.ResolveResult<AbstractFileSystem> res;
+      try {
+        res = fsState.resolve((path.toString()), true);
+      } catch (FileNotFoundException ex) {
+        NotInMountpointException mountPointEx =
+            new NotInMountpointException(path,
+                String.format("getEnclosingRoot - %s", ex.getMessage()));
+        mountPointEx.initCause(ex);
+        throw mountPointEx;
+      }
+      Path fullPath = new Path(res.resolvedPath);
+      Path enclosingPath = res.targetFileSystem.getEnclosingRoot(path);
+      return enclosingPath.depth() > fullPath.depth() ?  enclosingPath : fullPath;
+    }
   }
 }

+ 34 - 1
hadoop-common-project/hadoop-common/src/site/markdown/filesystem/filesystem.md

@@ -601,7 +601,40 @@ on the filesystem.
 
 1. The outcome of this operation MUST be identical to the value of
    `getFileStatus(P).getBlockSize()`.
-1. By inference, it MUST be > 0 for any file of length > 0.
+2. By inference, it MUST be > 0 for any file of length > 0.
+
+###  `Path getEnclosingRoot(Path p)`
+
+This method is used to find a root directory for a path given. This is useful for creating
+staging and temp directories in the same enclosing root directory. There are constraints around how
+renames are allowed to atomically occur (ex. across hdfs volumes or across encryption zones).
+
+For any two paths p1 and p2 that do not have the same enclosing root, `rename(p1, p2)` is expected to fail or will not
+be atomic.
+
+For object stores, even with the same enclosing root, there is no guarantee file or directory rename is atomic
+
+The following statement is always true:
+`getEnclosingRoot(p) == getEnclosingRoot(getEnclosingRoot(p))`
+
+
+```python
+path in ancestors(FS, p) or path == p:
+isDir(FS, p)
+```
+
+#### Preconditions
+
+The path does not have to exist, but the path does need to be valid and reconcilable by the filesystem
+* if a linkfallback is used all paths are reconcilable
+* if a linkfallback is not used there must be a mount point covering the path
+
+
+#### Postconditions
+
+* The path returned will not be null, if there is no deeper enclosing root, the root path ("/") will be returned.
+* The path returned is a directory
+
 
 ## <a name="state_changing_operations"></a> State Changing Operations
 

+ 94 - 0
hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/TestGetEnclosingRoot.java

@@ -0,0 +1,94 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.fs;
+
+import java.io.IOException;
+import java.security.PrivilegedExceptionAction;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.test.HadoopTestBase;
+import org.junit.Test;
+
+public class TestGetEnclosingRoot extends HadoopTestBase {
+  @Test
+  public void testEnclosingRootEquivalence() throws IOException {
+    FileSystem fs = getFileSystem();
+    Path root = path("/");
+    Path foobar = path("/foo/bar");
+
+    assertEquals(root, fs.getEnclosingRoot(root));
+    assertEquals(root, fs.getEnclosingRoot(foobar));
+    assertEquals(root, fs.getEnclosingRoot(fs.getEnclosingRoot(foobar)));
+    assertEquals(fs.getEnclosingRoot(root), fs.getEnclosingRoot(foobar));
+
+    assertEquals(root, fs.getEnclosingRoot(path(foobar.toString())));
+    assertEquals(root, fs.getEnclosingRoot(fs.getEnclosingRoot(path(foobar.toString()))));
+    assertEquals(fs.getEnclosingRoot(root), fs.getEnclosingRoot(path(foobar.toString())));
+  }
+
+  @Test
+  public void testEnclosingRootPathExists() throws Exception {
+    FileSystem fs = getFileSystem();
+    Path root = path("/");
+    Path foobar = path("/foo/bar");
+    fs.mkdirs(foobar);
+
+    assertEquals(root, fs.getEnclosingRoot(foobar));
+    assertEquals(root, fs.getEnclosingRoot(path(foobar.toString())));
+  }
+
+  @Test
+  public void testEnclosingRootPathDNE() throws Exception {
+    FileSystem fs = getFileSystem();
+    Path foobar = path("/foo/bar");
+    Path root = path("/");
+
+    assertEquals(root, fs.getEnclosingRoot(foobar));
+    assertEquals(root, fs.getEnclosingRoot(path(foobar.toString())));
+  }
+
+  @Test
+  public void testEnclosingRootWrapped() throws Exception {
+    FileSystem fs = getFileSystem();
+    Path root = path("/");
+
+    assertEquals(root, fs.getEnclosingRoot(new Path("/foo/bar")));
+
+    UserGroupInformation ugi = UserGroupInformation.createRemoteUser("foo");
+    Path p = ugi.doAs((PrivilegedExceptionAction<Path>) () -> {
+      FileSystem wFs = getFileSystem();
+      return wFs.getEnclosingRoot(new Path("/foo/bar"));
+    });
+    assertEquals(root, p);
+  }
+
+  private FileSystem getFileSystem() throws IOException {
+    return FileSystem.get(new Configuration());
+  }
+
+  /**
+   * Create a path under the test path provided by
+   * the FS contract.
+   * @param filepath path string in
+   * @return a path qualified by the test filesystem
+   * @throws IOException IO problems
+   */
+  private Path path(String filepath) throws IOException {
+    return getFileSystem().makeQualified(
+        new Path(filepath));
+  }}

+ 2 - 0
hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/TestHarFileSystem.java

@@ -255,6 +255,8 @@ public class TestHarFileSystem {
 
     FSDataOutputStream append(Path f, int bufferSize,
         Progressable progress, boolean appendToNewBlock) throws IOException;
+
+    Path getEnclosingRoot(Path path) throws IOException;
   }
 
   @Test

+ 103 - 0
hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/AbstractContractGetEnclosingRoot.java

@@ -0,0 +1,103 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.fs.contract;
+
+import java.io.IOException;
+import java.security.PrivilegedExceptionAction;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+public abstract class AbstractContractGetEnclosingRoot extends AbstractFSContractTestBase {
+  private static final Logger LOG = LoggerFactory.getLogger(AbstractContractGetEnclosingRoot.class);
+
+  @Test
+  public void testEnclosingRootEquivalence() throws IOException {
+    FileSystem fs = getFileSystem();
+    Path root = path("/");
+    Path foobar = path("/foo/bar");
+
+    assertEquals("Ensure getEnclosingRoot on the root directory returns the root directory",
+        root, fs.getEnclosingRoot(foobar));
+    assertEquals("Ensure getEnclosingRoot called on itself returns the root directory",
+        root, fs.getEnclosingRoot(fs.getEnclosingRoot(foobar)));
+    assertEquals(
+        "Ensure getEnclosingRoot for different paths in the same enclosing root "
+            + "returns the same path",
+        fs.getEnclosingRoot(root), fs.getEnclosingRoot(foobar));
+    assertEquals("Ensure getEnclosingRoot on a path returns the root directory",
+        root, fs.getEnclosingRoot(methodPath()));
+    assertEquals("Ensure getEnclosingRoot called on itself on a path returns the root directory",
+        root, fs.getEnclosingRoot(fs.getEnclosingRoot(methodPath())));
+    assertEquals(
+        "Ensure getEnclosingRoot for different paths in the same enclosing root "
+            + "returns the same path",
+        fs.getEnclosingRoot(root),
+        fs.getEnclosingRoot(methodPath()));
+  }
+
+
+  @Test
+  public void testEnclosingRootPathExists() throws Exception {
+    FileSystem fs = getFileSystem();
+    Path root = path("/");
+    Path foobar = methodPath();
+    fs.mkdirs(foobar);
+
+    assertEquals(
+        "Ensure getEnclosingRoot returns the root directory when the root directory exists",
+        root, fs.getEnclosingRoot(foobar));
+    assertEquals("Ensure getEnclosingRoot returns the root directory when the directory exists",
+        root, fs.getEnclosingRoot(foobar));
+  }
+
+  @Test
+  public void testEnclosingRootPathDNE() throws Exception {
+    FileSystem fs = getFileSystem();
+    Path foobar = path("/foo/bar");
+    Path root = path("/");
+
+    // .
+    assertEquals(
+        "Ensure getEnclosingRoot returns the root directory even when the path does not exist",
+        root, fs.getEnclosingRoot(foobar));
+    assertEquals(
+        "Ensure getEnclosingRoot returns the root directory even when the path does not exist",
+        root, fs.getEnclosingRoot(methodPath()));
+  }
+
+  @Test
+  public void testEnclosingRootWrapped() throws Exception {
+    FileSystem fs = getFileSystem();
+    Path root = path("/");
+
+    assertEquals("Ensure getEnclosingRoot returns the root directory when the directory exists",
+        root, fs.getEnclosingRoot(new Path("/foo/bar")));
+
+    UserGroupInformation ugi = UserGroupInformation.createRemoteUser("foo");
+    Path p = ugi.doAs((PrivilegedExceptionAction<Path>) () -> {
+      FileSystem wFs = getContract().getTestFileSystem();
+      return wFs.getEnclosingRoot(new Path("/foo/bar"));
+    });
+    assertEquals("Ensure getEnclosingRoot works correctly within a wrapped FileSystem", root, p);
+  }
+}

+ 32 - 0
hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/localfs/TestLocalFSContractGetEnclosingRoot.java

@@ -0,0 +1,32 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.fs.contract.localfs;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.contract.AbstractContractGetEnclosingRoot;
+import org.apache.hadoop.fs.contract.AbstractFSContract;
+
+
+public class TestLocalFSContractGetEnclosingRoot
+    extends AbstractContractGetEnclosingRoot {
+
+  @Override
+  protected AbstractFSContract createContract(Configuration conf) {
+    return new LocalFSContract(conf);
+  }
+}

+ 32 - 0
hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/rawlocal/TestRawlocalContractGetEnclosingRoot.java

@@ -0,0 +1,32 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.fs.contract.rawlocal;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.contract.AbstractContractGetEnclosingRoot;
+import org.apache.hadoop.fs.contract.AbstractFSContract;
+
+
+public class TestRawlocalContractGetEnclosingRoot extends AbstractContractGetEnclosingRoot {
+
+
+  @Override
+  protected AbstractFSContract createContract(Configuration conf) {
+    return new RawlocalFSContract(conf);
+  }
+}

+ 10 - 0
hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSClient.java

@@ -3442,4 +3442,14 @@ public class DFSClient implements java.io.Closeable, RemotePeerFactory,
     }
   }
 
+  public Path getEnclosingRoot(String src) throws IOException {
+    checkOpen();
+    try (TraceScope ignored = newPathTraceScope("getEnclosingRoot", src)) {
+      return namenode.getEnclosingRoot(src);
+    } catch (RemoteException re) {
+      throw re.unwrapRemoteException(AccessControlException.class,
+          UnresolvedPathException.class);
+    }
+  }
+
 }

+ 1 - 0
hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSOpsCountStatistics.java

@@ -64,6 +64,7 @@ public class DFSOpsCountStatistics extends StorageStatistics {
     GET_EC_CODECS("op_get_ec_codecs"),
     GET_EC_POLICY("op_get_ec_policy"),
     GET_EC_POLICIES("op_get_ec_policies"),
+    GET_ENCLOSING_ROOT("op_get_enclosing_root"),
     GET_ENCRYPTION_ZONE("op_get_encryption_zone"),
     GET_FILE_BLOCK_LOCATIONS("op_get_file_block_locations"),
     GET_FILE_CHECKSUM(CommonStatisticNames.OP_GET_FILE_CHECKSUM),

+ 26 - 0
hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DistributedFileSystem.java

@@ -3740,4 +3740,30 @@ public class DistributedFileSystem extends FileSystem
     return dfs.slowDatanodeReport();
   }
 
+  /**
+   * Return path of the enclosing root for a given path
+   * The enclosing root path is a common ancestor that should be used for temp and staging dirs
+   * as well as within encryption zones and other restricted directories.
+   *
+   * @param path file path to find the enclosing root path for
+   * @return a path to the enclosing root
+   * @throws IOException early checks like failure to resolve path cause IO failures
+   */
+  public Path getEnclosingRoot(final Path path) throws IOException {
+    statistics.incrementReadOps(1);
+    storageStatistics.incrementOpCounter(OpType.GET_ENCLOSING_ROOT);
+    Preconditions.checkNotNull(path);
+    Path absF = fixRelativePart(path);
+    return new FileSystemLinkResolver<Path>() {
+      @Override
+      public Path doCall(final Path p) throws IOException {
+        return dfs.getEnclosingRoot(getPathName(p));
+      }
+
+      @Override
+      public Path next(final FileSystem fs, final Path p) throws IOException {
+        return fs.getEnclosingRoot(p);
+      }
+    }.resolve(this, absF);
+  }
 }

+ 8 - 0
hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/ClientProtocol.java

@@ -26,6 +26,7 @@ import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceStability;
 import org.apache.hadoop.crypto.CryptoProtocolVersion;
 import org.apache.hadoop.fs.BatchedRemoteIterator.BatchedEntries;
+import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.PathIsNotEmptyDirectoryException;
 import org.apache.hadoop.ha.HAServiceProtocol;
 import org.apache.hadoop.hdfs.AddBlockFlag;
@@ -1868,4 +1869,11 @@ public interface ClientProtocol {
   @ReadOnly
   DatanodeInfo[] getSlowDatanodeReport() throws IOException;
 
+  /**
+   * Get the enclosing root for a path.
+   */
+  @Idempotent
+  @ReadOnly(isCoordinated = true)
+  Path getEnclosingRoot(String src) throws IOException;
+
 }

+ 18 - 0
hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolTranslatorPB.java

@@ -40,6 +40,7 @@ import org.apache.hadoop.fs.ContentSummary;
 import org.apache.hadoop.fs.CreateFlag;
 import org.apache.hadoop.fs.FsServerDefaults;
 import org.apache.hadoop.fs.Options.Rename;
+import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.QuotaUsage;
 import org.apache.hadoop.fs.StorageType;
 import org.apache.hadoop.fs.XAttr;
@@ -128,6 +129,8 @@ import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetDat
 import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetDatanodeReportRequestProto;
 import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetDatanodeStorageReportRequestProto;
 import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetEditsFromTxidRequestProto;
+import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetEnclosingRootRequestProto;
+import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetEnclosingRootResponseProto;
 import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetFileInfoRequestProto;
 import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetFileInfoResponseProto;
 import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetFileLinkInfoRequestProto;
@@ -1648,4 +1651,19 @@ public class ClientNamenodeProtocolTranslatorPB implements
     }
   }
 
+  @Override
+  public Path getEnclosingRoot(String filename) throws IOException {
+    final GetEnclosingRootRequestProto.Builder builder =
+        GetEnclosingRootRequestProto.newBuilder();
+    builder.setFilename(filename);
+    final GetEnclosingRootRequestProto req = builder.build();
+    try {
+      final GetEnclosingRootResponseProto response =
+          rpcProxy.getEnclosingRoot(null, req);
+      return new Path(response.getEnclosingRootPath());
+    } catch (ServiceException e) {
+      throw getRemoteException(e);
+    }
+  }
+
 }

+ 10 - 0
hadoop-hdfs-project/hadoop-hdfs-client/src/main/proto/ClientNamenodeProtocol.proto

@@ -419,6 +419,14 @@ message GetPreferredBlockSizeResponseProto {
 message GetSlowDatanodeReportRequestProto {
 }
 
+message GetEnclosingRootRequestProto {
+  optional string filename = 1;
+}
+
+message GetEnclosingRootResponseProto {
+  optional string enclosingRootPath = 1;
+}
+
 message GetSlowDatanodeReportResponseProto {
   repeated DatanodeInfoProto datanodeInfoProto = 1;
 }
@@ -1069,4 +1077,6 @@ service ClientNamenodeProtocol {
       returns(HAServiceStateResponseProto);
   rpc getSlowDatanodeReport(GetSlowDatanodeReportRequestProto)
       returns(GetSlowDatanodeReportResponseProto);
+  rpc getEnclosingRoot(GetEnclosingRootRequestProto)
+      returns(GetEnclosingRootResponseProto);
 }

+ 1 - 0
hadoop-hdfs-project/hadoop-hdfs-client/src/test/java/org/apache/hadoop/hdfs/protocol/TestReadOnly.java

@@ -55,6 +55,7 @@ public class TestReadOnly {
           "listCachePools",
           "getAclStatus",
           "getEZForPath",
+          "getEnclosingRoot",
           "listEncryptionZones",
           "listReencryptionStatus",
           "getXAttrs",

+ 33 - 0
hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterClientProtocol.java

@@ -142,6 +142,9 @@ public class RouterClientProtocol implements ClientProtocol {
   /** Time out when getting the mount statistics. */
   private long mountStatusTimeOut;
 
+  /** Default nameservice enabled. */
+  private final boolean defaultNameServiceEnabled;
+
   /** Identifier for the super user. */
   private String superUser;
   /** Identifier for the super group. */
@@ -190,6 +193,9 @@ public class RouterClientProtocol implements ClientProtocol {
     this.snapshotProto = new RouterSnapshot(rpcServer);
     this.routerCacheAdmin = new RouterCacheAdmin(rpcServer);
     this.securityManager = rpcServer.getRouterSecurityManager();
+    this.defaultNameServiceEnabled = conf.getBoolean(
+        RBFConfigKeys.DFS_ROUTER_DEFAULT_NAMESERVICE_ENABLE,
+        RBFConfigKeys.DFS_ROUTER_DEFAULT_NAMESERVICE_ENABLE_DEFAULT);
   }
 
   @Override
@@ -1797,6 +1803,33 @@ public class RouterClientProtocol implements ClientProtocol {
     return rpcServer.getSlowDatanodeReport(true, 0);
   }
 
+  @Override
+  public Path getEnclosingRoot(String src) throws IOException {
+    Path mountPath = null;
+    if (defaultNameServiceEnabled) {
+      mountPath = new Path("/");
+    }
+
+    if (subclusterResolver instanceof MountTableResolver) {
+      MountTableResolver mountTable = (MountTableResolver) subclusterResolver;
+      if (mountTable.getMountPoint(src) != null) {
+        mountPath = new Path(mountTable.getMountPoint(src).getSourcePath());
+      }
+    }
+
+    if (mountPath == null) {
+      throw new IOException(String.format("No mount point for %s", src));
+    }
+
+    EncryptionZone zone = getEZForPath(src);
+    if (zone == null) {
+      return mountPath;
+    } else {
+      Path zonePath = new Path(zone.getPath());
+      return zonePath.depth() > mountPath.depth() ? zonePath : mountPath;
+    }
+  }
+
   @Override
   public HAServiceProtocol.HAServiceState getHAServiceState() {
     if (rpcServer.isSafeMode()) {

+ 7 - 1
hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcServer.java

@@ -52,6 +52,7 @@ import org.apache.hadoop.fs.CreateFlag;
 import org.apache.hadoop.fs.FileAlreadyExistsException;
 import org.apache.hadoop.fs.FsServerDefaults;
 import org.apache.hadoop.fs.Options;
+import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.QuotaUsage;
 import org.apache.hadoop.fs.StorageType;
 import org.apache.hadoop.fs.XAttr;
@@ -1346,6 +1347,11 @@ public class RouterRpcServer extends AbstractService implements ClientProtocol,
     return clientProto.getSlowDatanodeReport();
   }
 
+  @Override // ClientProtocol
+  public Path getEnclosingRoot(String src) throws IOException {
+    return clientProto.getEnclosingRoot(src);
+  }
+
   @Override // NamenodeProtocol
   public BlocksWithLocations getBlocks(DatanodeInfo datanode, long size,
       long minBlockSize) throws IOException {
@@ -1793,4 +1799,4 @@ public class RouterRpcServer extends AbstractService implements ClientProtocol,
     }
   }
 
-}
+}

+ 25 - 0
hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterMountTable.java

@@ -711,4 +711,29 @@ public class TestRouterMountTable {
       nnFs0.delete(new Path("/testLsMountEntryDest"), true);
     }
   }
+
+  @Test
+  public void testGetEnclosingRoot() throws Exception {
+
+    // Add a read only entry
+    MountTable readOnlyEntry = MountTable.newInstance(
+        "/readonly", Collections.singletonMap("ns0", "/testdir"));
+    readOnlyEntry.setReadOnly(true);
+    assertTrue(addMountTable(readOnlyEntry));
+    assertEquals(routerFs.getEnclosingRoot(new Path("/readonly")), new Path("/readonly"));
+
+    assertEquals(routerFs.getEnclosingRoot(new Path("/regular")), new Path("/"));
+    assertEquals(routerFs.getEnclosingRoot(new Path("/regular")),
+        routerFs.getEnclosingRoot(routerFs.getEnclosingRoot(new Path("/regular"))));
+
+    // Add a regular entry
+    MountTable regularEntry = MountTable.newInstance(
+        "/regular", Collections.singletonMap("ns0", "/testdir"));
+    assertTrue(addMountTable(regularEntry));
+    assertEquals(routerFs.getEnclosingRoot(new Path("/regular")), new Path("/regular"));
+
+    // path does not need to exist
+    assertEquals(routerFs.getEnclosingRoot(new Path("/regular/pathDNE")), new Path("/regular"));
+
+  }
 }

+ 19 - 3
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolServerSideTranslatorPB.java

@@ -25,8 +25,6 @@ import java.util.List;
 import java.util.Map;
 import java.util.stream.Collectors;
 
-import org.apache.hadoop.thirdparty.protobuf.ByteString;
-import org.apache.hadoop.thirdparty.protobuf.ProtocolStringList;
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceStability;
 import org.apache.hadoop.fs.BatchedRemoteIterator.BatchedEntries;
@@ -40,6 +38,7 @@ import org.apache.hadoop.fs.ContentSummary;
 import org.apache.hadoop.fs.CreateFlag;
 import org.apache.hadoop.fs.FsServerDefaults;
 import org.apache.hadoop.fs.Options.Rename;
+import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.QuotaUsage;
 import org.apache.hadoop.hdfs.protocol.AddErasureCodingPolicyResponse;
 import org.apache.hadoop.hdfs.protocol.BatchedDirectoryListing;
@@ -133,6 +132,8 @@ import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetDat
 import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetDatanodeStorageReportResponseProto;
 import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetEditsFromTxidRequestProto;
 import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetEditsFromTxidResponseProto;
+import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetEnclosingRootRequestProto;
+import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetEnclosingRootResponseProto;
 import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetFileInfoRequestProto;
 import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetFileInfoResponseProto;
 import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetFileLinkInfoRequestProto;
@@ -300,7 +301,8 @@ import org.apache.hadoop.security.proto.SecurityProtos.GetDelegationTokenRespons
 import org.apache.hadoop.security.proto.SecurityProtos.RenewDelegationTokenRequestProto;
 import org.apache.hadoop.security.proto.SecurityProtos.RenewDelegationTokenResponseProto;
 import org.apache.hadoop.security.token.Token;
-
+import org.apache.hadoop.thirdparty.protobuf.ByteString;
+import org.apache.hadoop.thirdparty.protobuf.ProtocolStringList;
 import org.apache.hadoop.thirdparty.protobuf.RpcController;
 import org.apache.hadoop.thirdparty.protobuf.ServiceException;
 
@@ -2050,4 +2052,18 @@ public class ClientNamenodeProtocolServerSideTranslatorPB implements
       throw new ServiceException(e);
     }
   }
+
+  @Override
+  public GetEnclosingRootResponseProto getEnclosingRoot(
+      RpcController controller, GetEnclosingRootRequestProto req)
+      throws ServiceException {
+    try {
+      Path enclosingRootPath = server.getEnclosingRoot(req.getFilename());
+      return GetEnclosingRootResponseProto.newBuilder()
+          .setEnclosingRootPath(enclosingRootPath.toUri().toString())
+          .build();
+    } catch (IOException e) {
+      throw new ServiceException(e);
+    }
+  }
 }

+ 15 - 0
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java

@@ -8763,4 +8763,19 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
       }
     }
   }
+
+  /**
+   * Get the enclosing root  for the specified path.
+   *
+   * @param srcArg the path of a file or directory to get the EZ for.
+   * @return the enclosing root of the path or null if none.
+   */
+  Path getEnclosingRoot(final String srcArg) throws IOException {
+    EncryptionZone ez = getEZForPath(srcArg);
+    if (ez != null) {
+      return new Path(ez.getPath());
+    } else {
+      return new Path("/");
+    }
+  }
 }

+ 7 - 0
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java

@@ -2672,4 +2672,11 @@ public class NameNodeRpcServer implements NamenodeProtocols {
     }
     return namesystem.getBlockManager().getSPSManager().getNextPathId();
   }
+
+  @Override // ClientProtocol
+  public Path getEnclosingRoot(String src)
+      throws IOException {
+    checkNNStartup();
+    return namesystem.getEnclosingRoot(src);
+  }
 }

+ 121 - 28
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/fs/viewfs/TestViewFileSystemHdfs.java

@@ -57,6 +57,7 @@ import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.FS_TRASH_INTERV
 import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.IPC_CLIENT_CONNECT_MAX_RETRIES_KEY;
 import static org.apache.hadoop.fs.FileSystem.TRASH_PREFIX;
 
+import org.apache.hadoop.test.LambdaTestUtils;
 import org.junit.After;
 import org.junit.AfterClass;
 import org.junit.Before;
@@ -191,34 +192,38 @@ public class TestViewFileSystemHdfs extends ViewFileSystemBaseTest {
 
   @Test
   public void testTrashRootsAfterEncryptionZoneDeletion() throws Exception {
-    final Path zone = new Path("/EZ");
-    fsTarget.mkdirs(zone);
-    final Path zone1 = new Path("/EZ/zone1");
-    fsTarget.mkdirs(zone1);
-
-    DFSTestUtil.createKey("test_key", cluster, CONF);
-    HdfsAdmin hdfsAdmin = new HdfsAdmin(cluster.getURI(0), CONF);
-    final EnumSet<CreateEncryptionZoneFlag> provisionTrash =
-        EnumSet.of(CreateEncryptionZoneFlag.PROVISION_TRASH);
-    hdfsAdmin.createEncryptionZone(zone1, "test_key", provisionTrash);
-
-    final Path encFile = new Path(zone1, "encFile");
-    DFSTestUtil.createFile(fsTarget, encFile, 10240, (short) 1, 0xFEED);
-
-    Configuration clientConf = new Configuration(CONF);
-    clientConf.setLong(FS_TRASH_INTERVAL_KEY, 1);
-    clientConf.set("fs.default.name", fsTarget.getUri().toString());
-    FsShell shell = new FsShell(clientConf);
-
-    //Verify file deletion within EZ
-    DFSTestUtil.verifyDelete(shell, fsTarget, encFile, true);
-    assertTrue("ViewFileSystem trash roots should include EZ file trash",
-        (fsView.getTrashRoots(true).size() == 1));
-
-    //Verify deletion of EZ
-    DFSTestUtil.verifyDelete(shell, fsTarget, zone, true);
-    assertTrue("ViewFileSystem trash roots should include EZ zone trash",
-        (fsView.getTrashRoots(true).size() == 2));
+    try {
+      final Path zone = new Path("/EZ");
+      fsTarget.mkdirs(zone);
+      final Path zone1 = new Path("/EZ/zone1");
+      fsTarget.mkdirs(zone1);
+
+      DFSTestUtil.createKey("test_key", cluster, CONF);
+      HdfsAdmin hdfsAdmin = new HdfsAdmin(cluster.getURI(0), CONF);
+      final EnumSet<CreateEncryptionZoneFlag> provisionTrash =
+          EnumSet.of(CreateEncryptionZoneFlag.PROVISION_TRASH);
+      hdfsAdmin.createEncryptionZone(zone1, "test_key", provisionTrash);
+
+      final Path encFile = new Path(zone1, "encFile");
+      DFSTestUtil.createFile(fsTarget, encFile, 10240, (short) 1, 0xFEED);
+
+      Configuration clientConf = new Configuration(CONF);
+      clientConf.setLong(FS_TRASH_INTERVAL_KEY, 1);
+      clientConf.set("fs.default.name", fsTarget.getUri().toString());
+      FsShell shell = new FsShell(clientConf);
+
+      //Verify file deletion within EZ
+      DFSTestUtil.verifyDelete(shell, fsTarget, encFile, true);
+      assertTrue("ViewFileSystem trash roots should include EZ file trash",
+          (fsView.getTrashRoots(true).size() == 1));
+
+      //Verify deletion of EZ
+      DFSTestUtil.verifyDelete(shell, fsTarget, zone, true);
+      assertTrue("ViewFileSystem trash roots should include EZ zone trash",
+          (fsView.getTrashRoots(true).size() == 2));
+    } finally {
+      DFSTestUtil.deleteKey("test_key", cluster);
+    }
   }
 
   @Test
@@ -506,4 +511,92 @@ public class TestViewFileSystemHdfs extends ViewFileSystemBaseTest {
     assertEquals(fs.getFileStatus(subDirOfInternalDir).getPermission(),
         fs.getFileStatus(subDirOfRealDir).getPermission());
   }
+
+  private Path getViewFsPath(Path path, FileSystem fs) {
+    return fs.makeQualified(path);
+  }
+
+  private Path getViewFsPath(String path, FileSystem fs) {
+    return getViewFsPath(new Path(path), fs);
+  }
+
+  @Test
+  public void testEnclosingRootsBase() throws Exception {
+    try {
+      final Path zone = new Path("/data/EZ");
+      fsTarget.mkdirs(zone);
+      final Path zone1 = new Path("/data/EZ/zone1");
+      fsTarget.mkdirs(zone1);
+
+      DFSTestUtil.createKey("test_key", cluster, 0, CONF);
+      HdfsAdmin hdfsAdmin = new HdfsAdmin(cluster.getURI(0), CONF);
+      final EnumSet<CreateEncryptionZoneFlag> provisionTrash =
+          EnumSet.of(CreateEncryptionZoneFlag.PROVISION_TRASH);
+      hdfsAdmin.createEncryptionZone(zone1, "test_key", provisionTrash);
+      assertEquals(fsView.getEnclosingRoot(zone), getViewFsPath("/data", fsView));
+      assertEquals(fsView.getEnclosingRoot(zone1), getViewFsPath(zone1, fsView));
+
+      Path nn02Ez = new Path("/mountOnNn2/EZ");
+      fsTarget2.mkdirs(nn02Ez);
+      assertEquals(fsView.getEnclosingRoot((nn02Ez)), getViewFsPath("/mountOnNn2", fsView));
+      HdfsAdmin hdfsAdmin2 = new HdfsAdmin(cluster.getURI(1), CONF);
+      DFSTestUtil.createKey("test_key", cluster, 1, CONF);
+      hdfsAdmin2.createEncryptionZone(nn02Ez, "test_key", provisionTrash);
+      assertEquals(fsView.getEnclosingRoot((nn02Ez)), getViewFsPath(nn02Ez, fsView));
+      assertEquals(fsView.getEnclosingRoot(new Path(nn02Ez, "dir/dir2/file")),
+          getViewFsPath(nn02Ez, fsView));
+
+      // With viewfs:// scheme
+      assertEquals(fsView.getEnclosingRoot(fsView.getWorkingDirectory()),
+          getViewFsPath("/user", fsView));
+    } finally {
+      DFSTestUtil.deleteKey("test_key", cluster, 0);
+    }
+  }
+
+  @Test
+  public void testEnclosingRootFailure() throws Exception {
+    LambdaTestUtils.intercept(NotInMountpointException.class,
+        ()-> fsView.getEnclosingRoot(new Path("/does/not/exist")));
+
+    final Path zone = new Path("/data/EZ");
+    Path fs1 = fsTarget.makeQualified(zone);
+
+    LambdaTestUtils.intercept(IllegalArgumentException.class,
+        ()-> fsView.getEnclosingRoot(fs1));
+    LambdaTestUtils.intercept(IllegalArgumentException.class,
+        ()-> fsView.getEnclosingRoot(new Path("hdfs://fakeAuthority/")));
+  }
+
+  @Test
+  public void testEnclosingRootWrapped() throws Exception {
+    try {
+      final Path zone = new Path("/data/EZ");
+      fsTarget.mkdirs(zone);
+      final Path zone1 = new Path("/data/EZ/testZone1");
+      fsTarget.mkdirs(zone1);
+
+      DFSTestUtil.createKey("test_key", cluster, 0, CONF);
+      HdfsAdmin hdfsAdmin = new HdfsAdmin(cluster.getURI(0), CONF);
+      final EnumSet<CreateEncryptionZoneFlag> provisionTrash =
+          EnumSet.of(CreateEncryptionZoneFlag.PROVISION_TRASH);
+      hdfsAdmin.createEncryptionZone(zone1, "test_key", provisionTrash);
+
+      UserGroupInformation ugi = UserGroupInformation.createRemoteUser("foo");
+      Path p = ugi.doAs((PrivilegedExceptionAction<Path>) () -> {
+        FileSystem wFs = FileSystem.get(FsConstants.VIEWFS_URI, this.conf);
+        return wFs.getEnclosingRoot(zone);
+      });
+      assertEquals(p, getViewFsPath("/data", fsView));
+      p = ugi.doAs((PrivilegedExceptionAction<Path>) () -> {
+        FileSystem wFs = FileSystem.get(FsConstants.VIEWFS_URI, this.conf);
+        return wFs.getEnclosingRoot(zone1);
+      });
+      assertEquals(p, getViewFsPath(zone1, fsView));
+
+
+    } finally {
+      DFSTestUtil.deleteKey("test_key", cluster, 0);
+    }
+  }
 }

+ 27 - 0
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/DFSTestUtil.java

@@ -1880,6 +1880,33 @@ public class DFSTestUtil {
     provider.flush();
   }
 
+  /**
+   * Helper function to delete a key in the Key Provider. Defaults
+   * to the first indexed NameNode's Key Provider.
+   *
+   * @param keyName The name of the key to create
+   * @param cluster The cluster to create it in
+   */
+  public static void deleteKey(String keyName, MiniDFSCluster cluster)
+      throws NoSuchAlgorithmException, IOException {
+    deleteKey(keyName, cluster, 0);
+  }
+
+  /**
+   * Helper function to delete a key in the Key Provider.
+   *
+   * @param keyName The name of the key to create
+   * @param cluster The cluster to create it in
+   * @param idx The NameNode index
+   */
+  public static void deleteKey(String keyName, MiniDFSCluster cluster, int idx)
+      throws NoSuchAlgorithmException, IOException {
+    NameNode nn = cluster.getNameNode(idx);
+    KeyProvider provider = nn.getNamesystem().getProvider();
+    provider.deleteKey(keyName);
+    provider.flush();
+  }
+
   /**
    * @return the node which is expected to run the recovery of the
    * given block, which is known to be under construction inside the

+ 5 - 0
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDistributedFileSystem.java

@@ -923,6 +923,11 @@ public class TestDistributedFileSystem {
       checkStatistics(dfs, ++readOps, writeOps, 0);
       checkOpStatistics(OpType.GET_ENCRYPTION_ZONE, opCount + 1);
 
+      opCount = getOpStatistics(OpType.GET_ENCLOSING_ROOT);
+      dfs.getEnclosingRoot(dir);
+      checkStatistics(dfs, ++readOps, writeOps, 0);
+      checkOpStatistics(OpType.GET_ENCLOSING_ROOT, opCount + 1);
+
       opCount = getOpStatistics(OpType.GET_SNAPSHOTTABLE_DIRECTORY_LIST);
       dfs.getSnapshottableDirListing();
       checkStatistics(dfs, ++readOps, writeOps, 0);

+ 149 - 0
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestEnclosingRoot.java

@@ -0,0 +1,149 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs;
+
+import java.io.File;
+import java.util.EnumSet;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.crypto.key.JavaKeyStoreProvider;
+import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
+import org.apache.hadoop.fs.FileSystemTestHelper;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.client.CreateEncryptionZoneFlag;
+import org.apache.hadoop.hdfs.client.HdfsAdmin;
+import org.apache.hadoop.hdfs.server.namenode.EncryptionFaultInjector;
+import org.apache.hadoop.hdfs.server.namenode.EncryptionZoneManager;
+import org.apache.hadoop.test.AbstractHadoopTestBase;
+import org.apache.hadoop.test.GenericTestUtils;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.slf4j.event.Level;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+public class TestEnclosingRoot extends AbstractHadoopTestBase {
+  private static final Logger LOG = LoggerFactory.getLogger(TestEnclosingRoot.class);
+  private static final String TEST_KEY = "test_key";
+  private static final EnumSet<CreateEncryptionZoneFlag> NO_TRASH =
+      EnumSet.of(CreateEncryptionZoneFlag.NO_TRASH);
+
+  private Configuration conf;
+  private FileSystemTestHelper fsHelper;
+
+  private MiniDFSCluster cluster;
+  private HdfsAdmin dfsAdmin;
+  private DistributedFileSystem fs;
+  private File testRootDir;
+
+  private String getKeyProviderURI() {
+    return JavaKeyStoreProvider.SCHEME_NAME + "://file" +
+        new Path(testRootDir.toString(), "test.jks").toUri();
+  }
+
+  @Before
+  public void setup() throws Exception {
+    conf = new HdfsConfiguration();
+    fsHelper = new FileSystemTestHelper();
+    // Set up java key store
+    String testRoot = fsHelper.getTestRootDir();
+    testRootDir = new File(testRoot).getAbsoluteFile();
+    conf.set(CommonConfigurationKeysPublic.HADOOP_SECURITY_KEY_PROVIDER_PATH,
+        getKeyProviderURI());
+    conf.setBoolean(DFSConfigKeys.DFS_NAMENODE_DELEGATION_TOKEN_ALWAYS_USE_KEY, true);
+    // Lower the batch size for testing
+    conf.setInt(DFSConfigKeys.DFS_NAMENODE_LIST_ENCRYPTION_ZONES_NUM_RESPONSES,
+        2);
+    cluster = new MiniDFSCluster.Builder(conf).numDataNodes(1).build();
+    cluster.waitActive();
+    GenericTestUtils.setLogLevel(
+        LoggerFactory.getLogger(EncryptionZoneManager.class), Level.TRACE);
+    fs = cluster.getFileSystem();
+    dfsAdmin = new HdfsAdmin(cluster.getURI(), conf);
+    setProvider();
+    // Create a test key
+    DFSTestUtil.createKey(TEST_KEY, cluster, conf);
+  }
+
+  protected void setProvider() {
+    // Need to set the client's KeyProvider to the NN's for JKS,
+    // else the updates do not get flushed properly
+    fs.getClient().setKeyProvider(cluster.getNameNode().getNamesystem()
+        .getProvider());
+  }
+
+  @After
+  public void teardown() {
+    try {
+      if (cluster != null) {
+        cluster.shutdown();
+        cluster = null;
+      }
+    } finally {
+      EncryptionFaultInjector.instance = new EncryptionFaultInjector();
+    }
+  }
+
+  @Test
+  /**
+   * Testing basic operations for getEnclosingRoot with dfs/DistributedFileSystem
+   */
+  public void testBasicOperations() throws Exception {
+    final Path rootDir = new Path("/");
+    final Path zone1 = new Path(rootDir, "zone1");
+
+    // Ensure that the root "/" returns the root without mount points or encryption zones
+    assertThat(fs.getEnclosingRoot(rootDir))
+        .describedAs("enclosing root of %s", rootDir)
+        .isEqualTo(rootDir);
+
+    // Ensure a dir returns the root without mount points or encryption zones
+    assertThat(fs.getEnclosingRoot(zone1))
+        .describedAs("enclosing root of %s", zone1)
+        .isEqualTo(rootDir);
+
+    // create an encryption zone
+    fs.mkdirs(zone1);
+    dfsAdmin.createEncryptionZone(zone1, TEST_KEY, NO_TRASH);
+
+    // Ensure that the root "/" returns the root with an encryption zone present
+    assertThat(fs.getEnclosingRoot(rootDir))
+        .describedAs("enclosing root of %s", rootDir)
+        .isEqualTo(rootDir);
+
+    // Ensure that the encryption zone path itself returns correctly as itself
+    assertThat(fs.getEnclosingRoot(zone1))
+        .describedAs("enclosing root of %s", zone1)
+        .isEqualTo(zone1);
+
+    // Ensure that a path where the file does not exist returns the encryption zone root path
+    final Path zone1FileDNE = new Path(zone1, "newDNE.txt");
+    assertThat(fs.getEnclosingRoot(zone1FileDNE))
+        .describedAs("enclosing root of %s", zone1FileDNE)
+        .isEqualTo(zone1);
+
+    // Ensure that a path where the dir does not exist returns the encryption zone root path
+    final Path zone1DirDNE = new Path(zone1, "zone2/newDNE.txt");
+    assertThat(fs.getEnclosingRoot(zone1DirDNE))
+        .describedAs("enclosing root of %s", zone1DirDNE)
+        .isEqualTo(zone1);
+  }
+}