Explorar o código

HDFS-4765. Permission check of symlink deletion incorrectly throws UnresolvedLinkException. Contributed by Andrew Wang

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-0.23@1495603 13f79535-47bb-0310-9956-ffa450edef68
Jason Darrell Lowe %!s(int64=12) %!d(string=hai) anos
pai
achega
3aaa87197c

+ 3 - 0
hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt

@@ -24,6 +24,9 @@ Release 0.23.9 - UNRELEASED
     HDFS-4878. On Remove Block, block is not removed from neededReplications
     queue. (Tao Luo via shv)
 
+    HDFS-4765. Permission check of symlink deletion incorrectly throws
+    UnresolvedLinkException. (Andrew Wang via jlowe)
+
 Release 0.23.8 - 2013-06-05
   
   INCOMPATIBLE CHANGES

+ 18 - 3
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java

@@ -1920,7 +1920,7 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
   /**
    * Remove the indicated file from namespace.
    * 
-   * @see ClientProtocol#delete(String, boolean) for detailed descriptoin and 
+   * @see ClientProtocol#delete(String, boolean) for detailed description and 
    * description of exceptions
    */
     boolean delete(String src, boolean recursive)
@@ -1966,7 +1966,8 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
         throw new IOException(src + " is non empty");
       }
       if (enforcePermission && isPermissionEnabled) {
-        checkPermission(pc, src, false, null, FsAction.WRITE, null, FsAction.ALL);
+        checkPermission(pc, src, false, null, FsAction.WRITE, null,
+            FsAction.ALL, false);
       }
       // Unlink the target directory from directory tree
       if (!dir.delete(src, collectedBlocks)) {
@@ -3619,13 +3620,27 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
   private void checkPermission(FSPermissionChecker pc,
       String path, boolean doCheckOwner, FsAction ancestorAccess,
       FsAction parentAccess, FsAction access, FsAction subAccess)
+      throws AccessControlException, UnresolvedLinkException {
+        checkPermission(pc, path, doCheckOwner, ancestorAccess,
+            parentAccess, access, subAccess, true);
+  }
+
+  /**
+   * Check whether current user have permissions to access the path. For more
+   * details of the parameters, see
+   * {@link FSPermissionChecker#checkPermission()}.
+   */
+  private void checkPermission(FSPermissionChecker pc,
+      String path, boolean doCheckOwner, FsAction ancestorAccess,
+      FsAction parentAccess, FsAction access, FsAction subAccess,
+      boolean resolveLink)
       throws AccessControlException, UnresolvedLinkException {
     if (!pc.isSuperUser()) {
       dir.waitForReady();
       readLock();
       try {
         pc.checkPermission(path, dir.rootDir, doCheckOwner, ancestorAccess,
-            parentAccess, access, subAccess);
+            parentAccess, access, subAccess, resolveLink);
       } finally {
         readUnlock();
       }

+ 31 - 28
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSPermissionChecker.java

@@ -112,6 +112,8 @@ class FSPermissionChecker {
    * @param subAccess If path is a directory,
    * it is the access required of the path and all the sub-directories.
    * If path is not a directory, there is no effect.
+   * @param resolveLink whether to resolve the final path component if it is
+   * a symlink
    * @throws AccessControlException
    * @throws UnresolvedLinkException
    * 
@@ -120,7 +122,7 @@ class FSPermissionChecker {
    */
   void checkPermission(String path, INodeDirectory root, boolean doCheckOwner,
       FsAction ancestorAccess, FsAction parentAccess, FsAction access,
-      FsAction subAccess) 
+      FsAction subAccess, boolean resolveLink)
       throws AccessControlException, UnresolvedLinkException {
     if (LOG.isDebugEnabled()) {
       LOG.debug("ACCESS CHECK: " + this
@@ -128,35 +130,36 @@ class FSPermissionChecker {
           + ", ancestorAccess=" + ancestorAccess
           + ", parentAccess=" + parentAccess
           + ", access=" + access
-          + ", subAccess=" + subAccess);
+          + ", subAccess=" + subAccess
+          + ", resolveLink=" + resolveLink);
     }
     // check if (parentAccess != null) && file exists, then check sb
-      // Resolve symlinks, the check is performed on the link target.
-      INode[] inodes = root.getExistingPathINodes(path, true);
-      int ancestorIndex = inodes.length - 2;
-      for(; ancestorIndex >= 0 && inodes[ancestorIndex] == null;
-          ancestorIndex--);
-      checkTraverse(inodes, ancestorIndex);
-
-      if (parentAccess != null && parentAccess.implies(FsAction.WRITE)
-          && inodes.length > 1 && inodes[inodes.length - 1] != null) {
-        checkStickyBit(inodes[inodes.length - 2], inodes[inodes.length - 1]);
-      }
-      if (ancestorAccess != null && inodes.length > 1) {
-        check(inodes, ancestorIndex, ancestorAccess);
-      }
-      if (parentAccess != null && inodes.length > 1) {
-        check(inodes, inodes.length - 2, parentAccess);
-      }
-      if (access != null) {
-        check(inodes[inodes.length - 1], access);
-      }
-      if (subAccess != null) {
-        checkSubAccess(inodes[inodes.length - 1], subAccess);
-      }
-      if (doCheckOwner) {
-        checkOwner(inodes[inodes.length - 1]);
-      }
+    // If resolveLink, the check is performed on the link target.
+    final INode[] inodes = root.getExistingPathINodes(path, resolveLink);
+    int ancestorIndex = inodes.length - 2;
+    for(; ancestorIndex >= 0 && inodes[ancestorIndex] == null;
+        ancestorIndex--);
+    checkTraverse(inodes, ancestorIndex);
+
+    if (parentAccess != null && parentAccess.implies(FsAction.WRITE)
+        && inodes.length > 1 && inodes[inodes.length - 1] != null) {
+      checkStickyBit(inodes[inodes.length - 2], inodes[inodes.length - 1]);
+    }
+    if (ancestorAccess != null && inodes.length > 1) {
+      check(inodes, ancestorIndex, ancestorAccess);
+    }
+    if (parentAccess != null && inodes.length > 1) {
+      check(inodes, inodes.length - 2, parentAccess);
+    }
+    if (access != null) {
+      check(inodes[inodes.length - 1], access);
+    }
+    if (subAccess != null) {
+      checkSubAccess(inodes[inodes.length - 1], subAccess);
+    }
+    if (doCheckOwner) {
+      checkOwner(inodes[inodes.length - 1]);
+    }
   }
 
   /** Guarded by {@link FSNamesystem#readLock()} */

+ 89 - 0
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/security/TestPermission.java

@@ -18,6 +18,7 @@
 package org.apache.hadoop.security;
 
 import java.io.IOException;
+import java.security.PrivilegedExceptionAction;
 import java.util.Random;
 
 import org.apache.commons.logging.Log;
@@ -29,6 +30,7 @@ import org.apache.hadoop.hdfs.HdfsConfiguration;
 import org.apache.hadoop.hdfs.MiniDFSCluster;
 import org.apache.hadoop.fs.*;
 import org.apache.hadoop.fs.permission.*;
+import org.apache.hadoop.test.GenericTestUtils;
 import org.apache.hadoop.util.StringUtils;
 
 import junit.framework.TestCase;
@@ -263,4 +265,91 @@ public class TestPermission extends TestCase {
       return false;
     }
   }
+
+  public void testSymlinkPermissions() throws Exception {
+    final Configuration conf = new HdfsConfiguration();
+    conf.setBoolean(DFSConfigKeys.DFS_PERMISSIONS_ENABLED_KEY, true);
+    conf.set(FsPermission.UMASK_LABEL, "000");
+    MiniDFSCluster cluster = null;
+    FileSystem fs = null;
+
+    try {
+      cluster = new MiniDFSCluster.Builder(conf).numDataNodes(3).build();
+      cluster.waitActive();
+      FileContext fc = FileContext.getFileContext(conf);
+      fs = FileSystem.get(conf);
+
+      // Create initial test files
+      final Path testDir = new Path("/symtest");
+      final Path target = new Path(testDir, "target");
+      final Path link = new Path(testDir, "link");
+      fs.mkdirs(testDir);
+      DFSTestUtil.createFile(fs, target, 1024, (short)3, 0xBEEFl);
+      fc.createSymlink(target, link, false);
+
+      // Non-super user to run commands with
+      final UserGroupInformation user = UserGroupInformation
+          .createRemoteUser("myuser");
+
+      // Case 1: parent directory is read-only
+      fs.setPermission(testDir, new FsPermission((short)0555));
+      try {
+        user.doAs(new PrivilegedExceptionAction<Object>() {
+          @Override
+          public Object run() throws IOException {
+            FileContext myfc = FileContext.getFileContext(conf);
+            myfc.delete(link, false);
+            return null;
+          }
+        });
+        fail("Deleted symlink without write permissions on parent!");
+      } catch (AccessControlException e) {
+        GenericTestUtils.assertExceptionContains("Permission denied", e);
+      }
+
+      // Case 2: target is not readable
+      fs.setPermission(target, new FsPermission((short)0000));
+      try {
+        user.doAs(new PrivilegedExceptionAction<Object>() {
+          @Override
+          public Object run() throws IOException {
+            FileContext myfc = FileContext.getFileContext(conf);
+            myfc.open(link).read();
+            return null;
+          }
+        });
+        fail("Read link target even though target does not have" +
+            " read permissions!");
+      } catch (IOException e) {
+        GenericTestUtils.assertExceptionContains("Permission denied", e);
+      }
+
+      // Case 3: parent directory is read/write
+      fs.setPermission(testDir, new FsPermission((short)0777));
+      user.doAs(new PrivilegedExceptionAction<Object>() {
+        @Override
+        public Object run() throws IOException {
+          FileContext myfc = FileContext.getFileContext(conf);
+          myfc.delete(link, false);
+          return null;
+        }
+      });
+      // Make sure only the link was deleted
+      assertTrue("Target should not have been deleted!",
+          fc.util().exists(target));
+      assertFalse("Link should have been deleted!",
+          fc.util().exists(link));
+    } finally {
+      try {
+        if(fs != null) fs.close();
+      } catch(Exception e) {
+        LOG.error(StringUtils.stringifyException(e));
+      }
+      try {
+        if(cluster != null) cluster.shutdown();
+      } catch(Exception e) {
+        LOG.error(StringUtils.stringifyException(e));
+      }
+    }
+  }
 }