浏览代码

HDFS-4765. Permission check of symlink deletion incorrectly throws UnresolvedLinkException. Contributed by Andrew Wang.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1481976 13f79535-47bb-0310-9956-ffa450edef68
Aaron Myers 12 年之前
父节点
当前提交
6521b5ee42

+ 3 - 0
hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt

@@ -989,6 +989,9 @@ Release 2.0.5-beta - UNRELEASED
     HDFS-4533. start-dfs.sh ignores additional parameters besides -upgrade.
     HDFS-4533. start-dfs.sh ignores additional parameters besides -upgrade.
     (Fengdong Yu via suresh)
     (Fengdong Yu via suresh)
 
 
+    HDFS-4765. Permission check of symlink deletion incorrectly throws
+    UnresolvedLinkException. (Andrew Wang via atm)
+
 Release 2.0.4-alpha - 2013-04-25
 Release 2.0.4-alpha - 2013-04-25
 
 
   INCOMPATIBLE CHANGES
   INCOMPATIBLE CHANGES

+ 18 - 3
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java

@@ -2849,7 +2849,7 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
   /**
   /**
    * Remove the indicated file from namespace.
    * Remove the indicated file from namespace.
    * 
    * 
-   * @see ClientProtocol#delete(String, boolean) for detailed descriptoin and 
+   * @see ClientProtocol#delete(String, boolean) for detailed description and 
    * description of exceptions
    * description of exceptions
    */
    */
   boolean delete(String src, boolean recursive)
   boolean delete(String src, boolean recursive)
@@ -2911,7 +2911,8 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
         throw new IOException(src + " is non empty");
         throw new IOException(src + " is non empty");
       }
       }
       if (enforcePermission && isPermissionEnabled) {
       if (enforcePermission && isPermissionEnabled) {
-        checkPermission(pc, src, false, null, FsAction.WRITE, null, FsAction.ALL);
+        checkPermission(pc, src, false, null, FsAction.WRITE, null,
+            FsAction.ALL, false);
       }
       }
       // Unlink the target directory from directory tree
       // Unlink the target directory from directory tree
       if (!dir.delete(src, collectedBlocks, removedINodes)) {
       if (!dir.delete(src, collectedBlocks, removedINodes)) {
@@ -4800,13 +4801,27 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
   private void checkPermission(FSPermissionChecker pc,
   private void checkPermission(FSPermissionChecker pc,
       String path, boolean doCheckOwner, FsAction ancestorAccess,
       String path, boolean doCheckOwner, FsAction ancestorAccess,
       FsAction parentAccess, FsAction access, FsAction subAccess)
       FsAction parentAccess, FsAction access, FsAction subAccess)
+      throws AccessControlException, UnresolvedLinkException {
+        checkPermission(pc, path, doCheckOwner, ancestorAccess,
+            parentAccess, access, subAccess, true);
+  }
+
+  /**
+   * Check whether current user have permissions to access the path. For more
+   * details of the parameters, see
+   * {@link FSPermissionChecker#checkPermission()}.
+   */
+  private void checkPermission(FSPermissionChecker pc,
+      String path, boolean doCheckOwner, FsAction ancestorAccess,
+      FsAction parentAccess, FsAction access, FsAction subAccess,
+      boolean resolveLink)
       throws AccessControlException, UnresolvedLinkException {
       throws AccessControlException, UnresolvedLinkException {
     if (!pc.isSuperUser()) {
     if (!pc.isSuperUser()) {
       dir.waitForReady();
       dir.waitForReady();
       readLock();
       readLock();
       try {
       try {
         pc.checkPermission(path, dir.rootDir, doCheckOwner, ancestorAccess,
         pc.checkPermission(path, dir.rootDir, doCheckOwner, ancestorAccess,
-            parentAccess, access, subAccess);
+            parentAccess, access, subAccess, resolveLink);
       } finally {
       } finally {
         readUnlock();
         readUnlock();
       }
       }

+ 33 - 30
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSPermissionChecker.java

@@ -122,6 +122,8 @@ class FSPermissionChecker {
    * @param subAccess If path is a directory,
    * @param subAccess If path is a directory,
    * it is the access required of the path and all the sub-directories.
    * it is the access required of the path and all the sub-directories.
    * If path is not a directory, there is no effect.
    * If path is not a directory, there is no effect.
+   * @param resolveLink whether to resolve the final path component if it is
+   * a symlink
    * @throws AccessControlException
    * @throws AccessControlException
    * @throws UnresolvedLinkException
    * @throws UnresolvedLinkException
    * 
    * 
@@ -130,7 +132,7 @@ class FSPermissionChecker {
    */
    */
   void checkPermission(String path, INodeDirectory root, boolean doCheckOwner,
   void checkPermission(String path, INodeDirectory root, boolean doCheckOwner,
       FsAction ancestorAccess, FsAction parentAccess, FsAction access,
       FsAction ancestorAccess, FsAction parentAccess, FsAction access,
-      FsAction subAccess) 
+      FsAction subAccess, boolean resolveLink)
       throws AccessControlException, UnresolvedLinkException {
       throws AccessControlException, UnresolvedLinkException {
     if (LOG.isDebugEnabled()) {
     if (LOG.isDebugEnabled()) {
       LOG.debug("ACCESS CHECK: " + this
       LOG.debug("ACCESS CHECK: " + this
@@ -138,38 +140,39 @@ class FSPermissionChecker {
           + ", ancestorAccess=" + ancestorAccess
           + ", ancestorAccess=" + ancestorAccess
           + ", parentAccess=" + parentAccess
           + ", parentAccess=" + parentAccess
           + ", access=" + access
           + ", access=" + access
-          + ", subAccess=" + subAccess);
+          + ", subAccess=" + subAccess
+          + ", resolveLink=" + resolveLink);
     }
     }
     // check if (parentAccess != null) && file exists, then check sb
     // check if (parentAccess != null) && file exists, then check sb
-      // Resolve symlinks, the check is performed on the link target.
-      final INodesInPath inodesInPath = root.getINodesInPath(path, true); 
-      final Snapshot snapshot = inodesInPath.getPathSnapshot();
-      final INode[] inodes = inodesInPath.getINodes();
-      int ancestorIndex = inodes.length - 2;
-      for(; ancestorIndex >= 0 && inodes[ancestorIndex] == null;
-          ancestorIndex--);
-      checkTraverse(inodes, ancestorIndex, snapshot);
+    // If resolveLink, the check is performed on the link target.
+    final INodesInPath inodesInPath = root.getINodesInPath(path, resolveLink);
+    final Snapshot snapshot = inodesInPath.getPathSnapshot();
+    final INode[] inodes = inodesInPath.getINodes();
+    int ancestorIndex = inodes.length - 2;
+    for(; ancestorIndex >= 0 && inodes[ancestorIndex] == null;
+        ancestorIndex--);
+    checkTraverse(inodes, ancestorIndex, snapshot);
 
 
-      final INode last = inodes[inodes.length - 1];
-      if (parentAccess != null && parentAccess.implies(FsAction.WRITE)
-          && inodes.length > 1 && last != null) {
-        checkStickyBit(inodes[inodes.length - 2], last, snapshot);
-      }
-      if (ancestorAccess != null && inodes.length > 1) {
-        check(inodes, ancestorIndex, snapshot, ancestorAccess);
-      }
-      if (parentAccess != null && inodes.length > 1) {
-        check(inodes, inodes.length - 2, snapshot, parentAccess);
-      }
-      if (access != null) {
-        check(last, snapshot, access);
-      }
-      if (subAccess != null) {
-        checkSubAccess(last, snapshot, subAccess);
-      }
-      if (doCheckOwner) {
-        checkOwner(last, snapshot);
-      }
+    final INode last = inodes[inodes.length - 1];
+    if (parentAccess != null && parentAccess.implies(FsAction.WRITE)
+        && inodes.length > 1 && last != null) {
+      checkStickyBit(inodes[inodes.length - 2], last, snapshot);
+    }
+    if (ancestorAccess != null && inodes.length > 1) {
+      check(inodes, ancestorIndex, snapshot, ancestorAccess);
+    }
+    if (parentAccess != null && inodes.length > 1) {
+      check(inodes, inodes.length - 2, snapshot, parentAccess);
+    }
+    if (access != null) {
+      check(last, snapshot, access);
+    }
+    if (subAccess != null) {
+      checkSubAccess(last, snapshot, subAccess);
+    }
+    if (doCheckOwner) {
+      checkOwner(last, snapshot);
+    }
   }
   }
 
 
   /** Guarded by {@link FSNamesystem#readLock()} */
   /** Guarded by {@link FSNamesystem#readLock()} */

+ 93 - 0
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/security/TestPermission.java

@@ -20,8 +20,10 @@ package org.apache.hadoop.security;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
 
 
 import java.io.IOException;
 import java.io.IOException;
+import java.security.PrivilegedExceptionAction;
 import java.util.Random;
 import java.util.Random;
 
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.Log;
@@ -30,14 +32,17 @@ import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.CommonConfigurationKeys;
 import org.apache.hadoop.fs.CommonConfigurationKeys;
 import org.apache.hadoop.fs.FSDataInputStream;
 import org.apache.hadoop.fs.FSDataInputStream;
 import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileContext;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.permission.FsPermission;
 import org.apache.hadoop.fs.permission.FsPermission;
 import org.apache.hadoop.hdfs.DFSConfigKeys;
 import org.apache.hadoop.hdfs.DFSConfigKeys;
 import org.apache.hadoop.hdfs.DFSTestUtil;
 import org.apache.hadoop.hdfs.DFSTestUtil;
+import org.apache.hadoop.hdfs.DistributedFileSystem;
 import org.apache.hadoop.hdfs.HdfsConfiguration;
 import org.apache.hadoop.hdfs.HdfsConfiguration;
 import org.apache.hadoop.hdfs.MiniDFSCluster;
 import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.test.GenericTestUtils;
 import org.apache.hadoop.util.StringUtils;
 import org.apache.hadoop.util.StringUtils;
 import org.junit.Test;
 import org.junit.Test;
 
 
@@ -310,4 +315,92 @@ public class TestPermission {
       return false;
       return false;
     }
     }
   }
   }
+
+  @Test(timeout = 30000)
+  public void testSymlinkPermissions() throws Exception {
+    final Configuration conf = new HdfsConfiguration();
+    conf.setBoolean(DFSConfigKeys.DFS_PERMISSIONS_ENABLED_KEY, true);
+    conf.set(FsPermission.UMASK_LABEL, "000");
+    MiniDFSCluster cluster = null;
+    FileSystem fs = null;
+
+    try {
+      cluster = new MiniDFSCluster.Builder(conf).numDataNodes(3).build();
+      cluster.waitActive();
+      FileContext fc = FileContext.getFileContext(conf);
+      fs = FileSystem.get(conf);
+
+      // Create initial test files
+      final Path testDir = new Path("/symtest");
+      final Path target = new Path(testDir, "target");
+      final Path link = new Path(testDir, "link");
+      fs.mkdirs(testDir);
+      DFSTestUtil.createFile(fs, target, 1024, (short)3, 0xBEEFl);
+      fc.createSymlink(target, link, false);
+
+      // Non-super user to run commands with
+      final UserGroupInformation user = UserGroupInformation
+          .createRemoteUser("myuser");
+
+      // Case 1: parent directory is read-only
+      fs.setPermission(testDir, new FsPermission((short)0555));
+      try {
+        user.doAs(new PrivilegedExceptionAction<Object>() {
+          @Override
+          public Object run() throws IOException {
+            FileContext myfc = FileContext.getFileContext(conf);
+            myfc.delete(link, false);
+            return null;
+          }
+        });
+        fail("Deleted symlink without write permissions on parent!");
+      } catch (AccessControlException e) {
+        GenericTestUtils.assertExceptionContains("Permission denied", e);
+      }
+
+      // Case 2: target is not readable
+      fs.setPermission(target, new FsPermission((short)0000));
+      try {
+        user.doAs(new PrivilegedExceptionAction<Object>() {
+          @Override
+          public Object run() throws IOException {
+            FileContext myfc = FileContext.getFileContext(conf);
+            myfc.open(link).read();
+            return null;
+          }
+        });
+        fail("Read link target even though target does not have" +
+            " read permissions!");
+      } catch (IOException e) {
+        GenericTestUtils.assertExceptionContains("Permission denied", e);
+      }
+
+      // Case 3: parent directory is read/write
+      fs.setPermission(testDir, new FsPermission((short)0777));
+      user.doAs(new PrivilegedExceptionAction<Object>() {
+        @Override
+        public Object run() throws IOException {
+          FileContext myfc = FileContext.getFileContext(conf);
+          myfc.delete(link, false);
+          return null;
+        }
+      });
+      // Make sure only the link was deleted
+      assertTrue("Target should not have been deleted!",
+          fc.util().exists(target));
+      assertFalse("Link should have been deleted!",
+          fc.util().exists(link));
+    } finally {
+      try {
+        if(fs != null) fs.close();
+      } catch(Exception e) {
+        LOG.error(StringUtils.stringifyException(e));
+      }
+      try {
+        if(cluster != null) cluster.shutdown();
+      } catch(Exception e) {
+        LOG.error(StringUtils.stringifyException(e));
+      }
+    }
+  }
 }
 }