Explorar o código

HDFS-6843. Create FileStatus isEncrypted() method (clamb via cmccabe)

Colin Patrick Mccabe %!s(int64=10) %!d(string=hai) anos
pai
achega
e3803d002c

+ 9 - 0
hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FileStatus.java

@@ -200,6 +200,15 @@ public class FileStatus implements Writable, Comparable {
   public FsPermission getPermission() {
     return permission;
   }
+
+  /**
+   * Tell whether the underlying file or directory is encrypted or not.
+   *
+   * @return true if the underlying file is encrypted.
+   */
+  public boolean isEncrypted() {
+    return permission.getEncryptedBit();
+  }
   
   /**
    * Get the owner of the file.

+ 7 - 0
hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/permission/FsPermission.java

@@ -294,6 +294,13 @@ public class FsPermission implements Writable {
     return false;
   }
 
+  /**
+   * Returns true if the file is encrypted or directory is in an encryption zone
+   */
+  public boolean getEncryptedBit() {
+    return false;
+  }
+
   /** Set the user file creation mask (umask) */
   public static void setUMask(Configuration conf, FsPermission umask) {
     conf.set(UMASK_LABEL, String.format("%1$03o", umask.toShort()));

+ 31 - 0
hadoop-common-project/hadoop-common/src/site/markdown/filesystem/filesystem.md

@@ -64,6 +64,33 @@ all operations on a valid FileSystem MUST result in a new FileSystem that is als
 
     def isSymlink(FS, p) = p in symlinks(FS)
 
+### 'boolean inEncryptionZone(Path p)'
+
+Return True if the data for p is encrypted. The nature of the encryption and the
+mechanism for creating an encryption zone are implementation details not covered
+in this specification. No guarantees are made about the quality of the
+encryption. The metadata is not encrypted.
+
+#### Preconditions
+
+    if not exists(FS, p) : raise FileNotFoundException
+
+#### Postconditions
+
+#### Invariants
+
+All files and directories under a directory in an encryption zone are also in an
+encryption zone
+
+    forall d in directories(FS): inEncyptionZone(FS, d) implies
+      forall c in children(FS, d) where (isFile(FS, c) or isDir(FS, c)) :
+        inEncyptionZone(FS, c)
+
+For all files in an encrypted zone, the data is encrypted, but the encryption
+type and specification are not defined.
+
+      forall f in files(FS) where  inEncyptionZone(FS, c):
+        isEncrypted(data(f))
 
 ### `FileStatus getFileStatus(Path p)`
 
@@ -88,6 +115,10 @@ Get the status of a path
             stat.length = 0
             stat.isdir = False
             stat.symlink = FS.Symlinks[p]
+        if inEncryptionZone(FS, p) :
+            stat.isEncrypted = True
+        else
+            stat.isEncrypted = False
 
 ### `Path getHomeDirectory()`
 

+ 12 - 0
hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/AbstractContractOpenTest.java

@@ -21,6 +21,7 @@ package org.apache.hadoop.fs.contract;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
 import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.IOUtils;
 import org.junit.Test;
@@ -30,6 +31,7 @@ import java.io.IOException;
 
 import static org.apache.hadoop.fs.contract.ContractTestUtils.createFile;
 import static org.apache.hadoop.fs.contract.ContractTestUtils.dataset;
+import static org.apache.hadoop.fs.contract.ContractTestUtils.rm;
 import static org.apache.hadoop.fs.contract.ContractTestUtils.touch;
 
 /**
@@ -65,6 +67,16 @@ public abstract class AbstractContractOpenTest extends AbstractFSContractTestBas
     assertMinusOne("initial byte read", result);
   }
 
+  @Test
+  public void testFsIsEncrypted() throws Exception {
+      describe("create an empty file and call FileStatus.isEncrypted()");
+      final Path path = path("file");
+      createFile(getFileSystem(), path, false, new byte[0]);
+      final FileStatus stat = getFileSystem().getFileStatus(path);
+      assertFalse("Expecting false for stat.isEncrypted()",
+          stat.isEncrypted());
+  }
+
   @Test
   public void testOpenReadDir() throws Throwable {
     describe("create & read a directory");

+ 21 - 9
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/FsAclPermission.java → hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/FsPermissionExtension.java

@@ -21,39 +21,46 @@ import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.fs.permission.FsPermission;
 
 /**
- * HDFS permission subclass used to indicate an ACL is present.  The ACL bit is
- * not visible directly to users of {@link FsPermission} serialization.  This is
+ * HDFS permission subclass used to indicate an ACL is present and/or that the
+ * underlying file/dir is encrypted. The ACL/encrypted bits are not visible
+ * directly to users of {@link FsPermission} serialization.  This is
  * done for backwards compatibility in case any existing clients assume the
  * value of FsPermission is in a particular range.
  */
 @InterfaceAudience.Private
-public class FsAclPermission extends FsPermission {
+public class FsPermissionExtension extends FsPermission {
   private final static short ACL_BIT = 1 << 12;
+  private final static short ENCRYPTED_BIT = 1 << 13;
   private final boolean aclBit;
+  private final boolean encryptedBit;
 
   /**
-   * Constructs a new FsAclPermission based on the given FsPermission.
+   * Constructs a new FsPermissionExtension based on the given FsPermission.
    *
    * @param perm FsPermission containing permission bits
    */
-  public FsAclPermission(FsPermission perm) {
+  public FsPermissionExtension(FsPermission perm, boolean hasAcl,
+      boolean isEncrypted) {
     super(perm.toShort());
-    aclBit = true;
+    aclBit = hasAcl;
+    encryptedBit = isEncrypted;
   }
 
   /**
-   * Creates a new FsAclPermission by calling the base class constructor.
+   * Creates a new FsPermissionExtension by calling the base class constructor.
    *
    * @param perm short containing permission bits
    */
-  public FsAclPermission(short perm) {
+  public FsPermissionExtension(short perm) {
     super(perm);
     aclBit = (perm & ACL_BIT) != 0;
+    encryptedBit = (perm & ENCRYPTED_BIT) != 0;
   }
 
   @Override
   public short toExtendedShort() {
-    return (short)(toShort() | (aclBit ? ACL_BIT : 0));
+    return (short)(toShort() |
+        (aclBit ? ACL_BIT : 0) | (encryptedBit ? ENCRYPTED_BIT : 0));
   }
 
   @Override
@@ -61,6 +68,11 @@ public class FsAclPermission extends FsPermission {
     return aclBit;
   }
 
+  @Override
+  public boolean getEncryptedBit() {
+    return encryptedBit;
+  }
+
   @Override
   public boolean equals(Object o) {
     // This intentionally delegates to the base class.  This is only overridden

+ 2 - 2
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelper.java

@@ -66,7 +66,7 @@ import org.apache.hadoop.hdfs.protocol.DirectoryListing;
 import org.apache.hadoop.hdfs.protocol.EncryptionZone;
 import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
 import org.apache.hadoop.fs.FileEncryptionInfo;
-import org.apache.hadoop.hdfs.protocol.FsAclPermission;
+import org.apache.hadoop.hdfs.protocol.FsPermissionExtension;
 import org.apache.hadoop.hdfs.protocol.HdfsConstants.DatanodeReportType;
 import org.apache.hadoop.hdfs.protocol.HdfsConstants.RollingUpgradeAction;
 import org.apache.hadoop.hdfs.protocol.HdfsConstants.SafeModeAction;
@@ -1264,7 +1264,7 @@ public class PBHelper {
   }
   
   public static FsPermission convert(FsPermissionProto p) {
-    return new FsAclPermission((short)p.getPerm());
+    return new FsPermissionExtension((short)p.getPerm());
   }
   
   

+ 26 - 10
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java

@@ -63,7 +63,7 @@ import org.apache.hadoop.hdfs.protocol.DirectoryListing;
 import org.apache.hadoop.hdfs.protocol.EncryptionZone;
 import org.apache.hadoop.hdfs.protocol.FSLimitException.MaxDirectoryItemsExceededException;
 import org.apache.hadoop.hdfs.protocol.FSLimitException.PathComponentTooLongException;
-import org.apache.hadoop.hdfs.protocol.FsAclPermission;
+import org.apache.hadoop.hdfs.protocol.FsPermissionExtension;
 import org.apache.hadoop.hdfs.protocol.HdfsConstants;
 import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
 import org.apache.hadoop.hdfs.protocol.HdfsLocatedFileStatus;
@@ -2313,18 +2313,25 @@ public class FSDirectory implements Closeable {
      long size = 0;     // length is zero for directories
      short replication = 0;
      long blocksize = 0;
+     final boolean isEncrypted;
+
+     final FileEncryptionInfo feInfo = isRawPath ? null :
+         getFileEncryptionInfo(node, snapshot);
+
      if (node.isFile()) {
        final INodeFile fileNode = node.asFile();
        size = fileNode.computeFileSize(snapshot);
        replication = fileNode.getFileReplication(snapshot);
        blocksize = fileNode.getPreferredBlockSize();
+       isEncrypted = (feInfo != null) ||
+           (isRawPath && isInAnEZ(INodesInPath.fromINode(node)));
+     } else {
+       isEncrypted = isInAnEZ(INodesInPath.fromINode(node));
      }
+
      int childrenNum = node.isDirectory() ? 
          node.asDirectory().getChildrenNum(snapshot) : 0;
 
-     FileEncryptionInfo feInfo = isRawPath ? null :
-         getFileEncryptionInfo(node, snapshot);
-
      return new HdfsFileStatus(
         size, 
         node.isDirectory(), 
@@ -2332,7 +2339,7 @@ public class FSDirectory implements Closeable {
         blocksize,
         node.getModificationTime(snapshot),
         node.getAccessTime(snapshot),
-        getPermissionForFileStatus(node, snapshot),
+        getPermissionForFileStatus(node, snapshot, isEncrypted),
         node.getUserName(snapshot),
         node.getGroupName(snapshot),
         node.isSymlink() ? node.asSymlink().getSymlink() : null,
@@ -2352,6 +2359,7 @@ public class FSDirectory implements Closeable {
     short replication = 0;
     long blocksize = 0;
     LocatedBlocks loc = null;
+    final boolean isEncrypted;
     final FileEncryptionInfo feInfo = isRawPath ? null :
         getFileEncryptionInfo(node, snapshot);
     if (node.isFile()) {
@@ -2371,6 +2379,10 @@ public class FSDirectory implements Closeable {
       if (loc == null) {
         loc = new LocatedBlocks();
       }
+      isEncrypted = (feInfo != null) ||
+          (isRawPath && isInAnEZ(INodesInPath.fromINode(node)));
+    } else {
+      isEncrypted = isInAnEZ(INodesInPath.fromINode(node));
     }
     int childrenNum = node.isDirectory() ? 
         node.asDirectory().getChildrenNum(snapshot) : 0;
@@ -2379,7 +2391,7 @@ public class FSDirectory implements Closeable {
         new HdfsLocatedFileStatus(size, node.isDirectory(), replication,
           blocksize, node.getModificationTime(snapshot),
           node.getAccessTime(snapshot),
-          getPermissionForFileStatus(node, snapshot),
+          getPermissionForFileStatus(node, snapshot, isEncrypted),
           node.getUserName(snapshot), node.getGroupName(snapshot),
           node.isSymlink() ? node.asSymlink().getSymlink() : null, path,
           node.getId(), loc, childrenNum, feInfo);
@@ -2395,17 +2407,21 @@ public class FSDirectory implements Closeable {
 
   /**
    * Returns an inode's FsPermission for use in an outbound FileStatus.  If the
-   * inode has an ACL, then this method will convert to a FsAclPermission.
+   * inode has an ACL or is for an encrypted file/dir, then this method will
+   * return an FsPermissionExtension.
    *
    * @param node INode to check
    * @param snapshot int snapshot ID
+   * @param isEncrypted boolean true if the file/dir is encrypted
    * @return FsPermission from inode, with ACL bit on if the inode has an ACL
+   * and encrypted bit on if it represents an encrypted file/dir.
    */
   private static FsPermission getPermissionForFileStatus(INode node,
-      int snapshot) {
+      int snapshot, boolean isEncrypted) {
     FsPermission perm = node.getFsPermission(snapshot);
-    if (node.getAclFeature(snapshot) != null) {
-      perm = new FsAclPermission(perm);
+    boolean hasAcl = node.getAclFeature(snapshot) != null;
+    if (hasAcl || isEncrypted) {
+      perm = new FsPermissionExtension(perm, hasAcl, isEncrypted);
     }
     return perm;
   }

+ 13 - 3
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/web/JsonUtil.java

@@ -180,9 +180,16 @@ public class JsonUtil {
   }
 
   /** Convert a string to a FsPermission object. */
-  private static FsPermission toFsPermission(final String s, Boolean aclBit) {
+  private static FsPermission toFsPermission(final String s, Boolean aclBit,
+      Boolean encBit) {
     FsPermission perm = new FsPermission(Short.parseShort(s, 8));
-    return (aclBit != null && aclBit) ? new FsAclPermission(perm) : perm;
+    final boolean aBit = (aclBit != null) ? aclBit : false;
+    final boolean eBit = (encBit != null) ? encBit : false;
+    if (aBit || eBit) {
+      return new FsPermissionExtension(perm, aBit, eBit);
+    } else {
+      return perm;
+    }
   }
 
   static enum PathType {
@@ -214,6 +221,9 @@ public class JsonUtil {
     if (perm.getAclBit()) {
       m.put("aclBit", true);
     }
+    if (perm.getEncryptedBit()) {
+      m.put("encBit", true);
+    }
     m.put("accessTime", status.getAccessTime());
     m.put("modificationTime", status.getModificationTime());
     m.put("blockSize", status.getBlockSize());
@@ -240,7 +250,7 @@ public class JsonUtil {
     final String owner = (String) m.get("owner");
     final String group = (String) m.get("group");
     final FsPermission permission = toFsPermission((String) m.get("permission"),
-      (Boolean)m.get("aclBit"));
+      (Boolean)m.get("aclBit"), (Boolean)m.get("encBit"));
     final long aTime = (Long) m.get("accessTime");
     final long mTime = (Long) m.get("modificationTime");
     final long blockSize = (Long) m.get("blockSize");

+ 88 - 0
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestEncryptionZones.java

@@ -47,6 +47,7 @@ import org.apache.hadoop.fs.FSTestWrapper;
 import org.apache.hadoop.fs.FileContext;
 import org.apache.hadoop.fs.FileContextTestWrapper;
 import org.apache.hadoop.fs.FileEncryptionInfo;
+import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.FileSystemTestHelper;
 import org.apache.hadoop.fs.FileSystemTestWrapper;
@@ -712,6 +713,93 @@ public class TestEncryptionZones {
     assertNumZones(0);
   }
 
+  @Test(timeout = 120000)
+  public void testIsEncryptedMethod() throws Exception {
+    doTestIsEncryptedMethod(new Path("/"));
+    doTestIsEncryptedMethod(new Path("/.reserved/raw"));
+  }
+
+  private void doTestIsEncryptedMethod(Path prefix) throws Exception {
+    try {
+      dTIEM(prefix);
+    } finally {
+      for (FileStatus s : fsWrapper.listStatus(prefix)) {
+        fsWrapper.delete(s.getPath(), true);
+      }
+    }
+  }
+
+  private void dTIEM(Path prefix) throws Exception {
+    final HdfsAdmin dfsAdmin =
+      new HdfsAdmin(FileSystem.getDefaultUri(conf), conf);
+    // Create an unencrypted file to check isEncrypted returns false
+    final Path baseFile = new Path(prefix, "base");
+    fsWrapper.createFile(baseFile);
+    FileStatus stat = fsWrapper.getFileStatus(baseFile);
+    assertFalse("Expected isEncrypted to return false for " + baseFile,
+        stat.isEncrypted());
+
+    // Create an encrypted file to check isEncrypted returns true
+    final Path zone = new Path(prefix, "zone");
+    fsWrapper.mkdir(zone, FsPermission.getDirDefault(), true);
+    dfsAdmin.createEncryptionZone(zone, TEST_KEY);
+    final Path encFile = new Path(zone, "encfile");
+    fsWrapper.createFile(encFile);
+    stat = fsWrapper.getFileStatus(encFile);
+    assertTrue("Expected isEncrypted to return true for enc file" + encFile,
+        stat.isEncrypted());
+
+    // check that it returns true for an ez root
+    stat = fsWrapper.getFileStatus(zone);
+    assertTrue("Expected isEncrypted to return true for ezroot",
+        stat.isEncrypted());
+
+    // check that it returns true for a dir in the ez
+    final Path zoneSubdir = new Path(zone, "subdir");
+    fsWrapper.mkdir(zoneSubdir, FsPermission.getDirDefault(), true);
+    stat = fsWrapper.getFileStatus(zoneSubdir);
+    assertTrue(
+        "Expected isEncrypted to return true for ez subdir " + zoneSubdir,
+        stat.isEncrypted());
+
+    // check that it returns false for a non ez dir
+    final Path nonEzDirPath = new Path(prefix, "nonzone");
+    fsWrapper.mkdir(nonEzDirPath, FsPermission.getDirDefault(), true);
+    stat = fsWrapper.getFileStatus(nonEzDirPath);
+    assertFalse(
+        "Expected isEncrypted to return false for directory " + nonEzDirPath,
+        stat.isEncrypted());
+
+    // check that it returns true for listings within an ez
+    FileStatus[] statuses = fsWrapper.listStatus(zone);
+    for (FileStatus s : statuses) {
+      assertTrue("Expected isEncrypted to return true for ez stat " + zone,
+          s.isEncrypted());
+    }
+
+    statuses = fsWrapper.listStatus(encFile);
+    for (FileStatus s : statuses) {
+      assertTrue(
+          "Expected isEncrypted to return true for ez file stat " + encFile,
+          s.isEncrypted());
+    }
+
+    // check that it returns false for listings outside an ez
+    statuses = fsWrapper.listStatus(nonEzDirPath);
+    for (FileStatus s : statuses) {
+      assertFalse(
+          "Expected isEncrypted to return false for nonez stat " + nonEzDirPath,
+          s.isEncrypted());
+    }
+
+    statuses = fsWrapper.listStatus(baseFile);
+    for (FileStatus s : statuses) {
+      assertFalse(
+          "Expected isEncrypted to return false for non ez stat " + baseFile,
+          s.isEncrypted());
+    }
+  }
+
   private class MyInjector extends EncryptionFaultInjector {
     int generateCount;
     CountDownLatch ready;

+ 3 - 2
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/FSAclBaseTest.java

@@ -39,7 +39,7 @@ import org.apache.hadoop.hdfs.DFSConfigKeys;
 import org.apache.hadoop.hdfs.DFSTestUtil;
 import org.apache.hadoop.hdfs.MiniDFSCluster;
 import org.apache.hadoop.hdfs.protocol.AclException;
-import org.apache.hadoop.hdfs.protocol.FsAclPermission;
+import org.apache.hadoop.hdfs.protocol.FsPermissionExtension;
 import org.apache.hadoop.io.IOUtils;
 import org.apache.hadoop.security.AccessControlException;
 import org.apache.hadoop.security.UserGroupInformation;
@@ -822,7 +822,8 @@ public abstract class FSAclBaseTest {
     fs.setPermission(path, FsPermission.createImmutable((short)0700));
     assertPermission((short)0700);
     fs.setPermission(path,
-      new FsAclPermission(FsPermission.createImmutable((short)0755)));
+      new FsPermissionExtension(FsPermission.
+          createImmutable((short)0755), true, true));
     INode inode = cluster.getNamesystem().getFSDirectory().getNode(
       path.toUri().getPath(), false);
     assertNotNull(inode);