1
0
Selaa lähdekoodia

Command hdfs dfs -rm -r can't remove empty directory. Contributed by Yongjun Zhang.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1594036 13f79535-47bb-0310-9956-ffa450edef68
Andrew Wang 11 vuotta sitten
vanhempi
commit
e9c6840b24

+ 4 - 1
hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt

@@ -440,12 +440,15 @@ Release 2.5.0 - UNRELEASED
     HDFS-6337. Setfacl testcase is failing due to dash character in username
     HDFS-6337. Setfacl testcase is failing due to dash character in username
     in TestAclCLI (umamahesh)
     in TestAclCLI (umamahesh)
 
 
-    HDFS-5381. ExtendedBlock#hashCode should use both blockId and block pool ID    
+    HDFS-5381. ExtendedBlock#hashCode should use both blockId and block pool ID
     (Benoy Antony via Colin Patrick McCabe)
     (Benoy Antony via Colin Patrick McCabe)
 
 
     HDFS-6240. WebImageViewer returns 404 if LISTSTATUS to an empty directory.
     HDFS-6240. WebImageViewer returns 404 if LISTSTATUS to an empty directory.
     (Akira Ajisaka via wheat9)
     (Akira Ajisaka via wheat9)
 
 
+    HDFS-6351. Command hdfs dfs -rm -r can't remove empty directory.
+    (Yongjun Zhang via wang)
+
 Release 2.4.1 - UNRELEASED
 Release 2.4.1 - UNRELEASED
 
 
   INCOMPATIBLE CHANGES
   INCOMPATIBLE CHANGES

+ 15 - 10
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java

@@ -139,6 +139,7 @@ import org.apache.hadoop.fs.Options;
 import org.apache.hadoop.fs.Options.Rename;
 import org.apache.hadoop.fs.Options.Rename;
 import org.apache.hadoop.fs.ParentNotDirectoryException;
 import org.apache.hadoop.fs.ParentNotDirectoryException;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.PathIsNotEmptyDirectoryException;
 import org.apache.hadoop.fs.UnresolvedLinkException;
 import org.apache.hadoop.fs.UnresolvedLinkException;
 import org.apache.hadoop.fs.permission.AclEntry;
 import org.apache.hadoop.fs.permission.AclEntry;
 import org.apache.hadoop.fs.permission.AclStatus;
 import org.apache.hadoop.fs.permission.AclStatus;
@@ -3243,10 +3244,11 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
       // Rename does not operates on link targets
       // Rename does not operates on link targets
       // Do not resolveLink when checking permissions of src and dst
       // Do not resolveLink when checking permissions of src and dst
       // Check write access to parent of src
       // Check write access to parent of src
-      checkPermission(pc, src, false, null, FsAction.WRITE, null, null, false);
+      checkPermission(pc, src, false, null, FsAction.WRITE, null, null,
+          false, false);
       // Check write access to ancestor of dst
       // Check write access to ancestor of dst
       checkPermission(pc, actualdst, false, FsAction.WRITE, null, null, null,
       checkPermission(pc, actualdst, false, FsAction.WRITE, null, null, null,
-          false);
+          false, false);
     }
     }
 
 
     if (dir.renameTo(src, dst, logRetryCache)) {
     if (dir.renameTo(src, dst, logRetryCache)) {
@@ -3307,9 +3309,11 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
       // Rename does not operates on link targets
       // Rename does not operates on link targets
       // Do not resolveLink when checking permissions of src and dst
       // Do not resolveLink when checking permissions of src and dst
       // Check write access to parent of src
       // Check write access to parent of src
-      checkPermission(pc, src, false, null, FsAction.WRITE, null, null, false);
+      checkPermission(pc, src, false, null, FsAction.WRITE, null, null, false,
+          false);
       // Check write access to ancestor of dst
       // Check write access to ancestor of dst
-      checkPermission(pc, dst, false, FsAction.WRITE, null, null, null, false);
+      checkPermission(pc, dst, false, FsAction.WRITE, null, null, null, false,
+          false);
     }
     }
 
 
     dir.renameTo(src, dst, logRetryCache, options);
     dir.renameTo(src, dst, logRetryCache, options);
@@ -3389,11 +3393,11 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
       checkNameNodeSafeMode("Cannot delete " + src);
       checkNameNodeSafeMode("Cannot delete " + src);
       src = FSDirectory.resolvePath(src, pathComponents, dir);
       src = FSDirectory.resolvePath(src, pathComponents, dir);
       if (!recursive && dir.isNonEmptyDirectory(src)) {
       if (!recursive && dir.isNonEmptyDirectory(src)) {
-        throw new IOException(src + " is non empty");
+        throw new PathIsNotEmptyDirectoryException(src + " is non empty");
       }
       }
       if (enforcePermission && isPermissionEnabled) {
       if (enforcePermission && isPermissionEnabled) {
         checkPermission(pc, src, false, null, FsAction.WRITE, null,
         checkPermission(pc, src, false, null, FsAction.WRITE, null,
-            FsAction.ALL, false);
+            FsAction.ALL, true, false);
       }
       }
       // Unlink the target directory from directory tree
       // Unlink the target directory from directory tree
       if (!dir.delete(src, collectedBlocks, removedINodes, logRetryCache)) {
       if (!dir.delete(src, collectedBlocks, removedINodes, logRetryCache)) {
@@ -3545,7 +3549,8 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
       checkOperation(OperationCategory.READ);
       checkOperation(OperationCategory.READ);
       src = FSDirectory.resolvePath(src, pathComponents, dir);
       src = FSDirectory.resolvePath(src, pathComponents, dir);
       if (isPermissionEnabled) {
       if (isPermissionEnabled) {
-        checkPermission(pc, src, false, null, null, null, null, resolveLink);
+        checkPermission(pc, src, false, null, null, null, null, false,
+            resolveLink);
       }
       }
       stat = dir.getFileInfo(src, resolveLink);
       stat = dir.getFileInfo(src, resolveLink);
     } catch (AccessControlException e) {
     } catch (AccessControlException e) {
@@ -5544,7 +5549,7 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
       FsAction parentAccess, FsAction access, FsAction subAccess)
       FsAction parentAccess, FsAction access, FsAction subAccess)
       throws AccessControlException, UnresolvedLinkException {
       throws AccessControlException, UnresolvedLinkException {
         checkPermission(pc, path, doCheckOwner, ancestorAccess,
         checkPermission(pc, path, doCheckOwner, ancestorAccess,
-            parentAccess, access, subAccess, true);
+            parentAccess, access, subAccess, false, true);
   }
   }
 
 
   /**
   /**
@@ -5555,14 +5560,14 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
   private void checkPermission(FSPermissionChecker pc,
   private void checkPermission(FSPermissionChecker pc,
       String path, boolean doCheckOwner, FsAction ancestorAccess,
       String path, boolean doCheckOwner, FsAction ancestorAccess,
       FsAction parentAccess, FsAction access, FsAction subAccess,
       FsAction parentAccess, FsAction access, FsAction subAccess,
-      boolean resolveLink)
+      boolean ignoreEmptyDir, boolean resolveLink)
       throws AccessControlException, UnresolvedLinkException {
       throws AccessControlException, UnresolvedLinkException {
     if (!pc.isSuperUser()) {
     if (!pc.isSuperUser()) {
       dir.waitForReady();
       dir.waitForReady();
       readLock();
       readLock();
       try {
       try {
         pc.checkPermission(path, dir, doCheckOwner, ancestorAccess,
         pc.checkPermission(path, dir, doCheckOwner, ancestorAccess,
-            parentAccess, access, subAccess, resolveLink);
+            parentAccess, access, subAccess, ignoreEmptyDir, resolveLink);
       } finally {
       } finally {
         readUnlock();
         readUnlock();
       }
       }

+ 12 - 6
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSPermissionChecker.java

@@ -32,6 +32,7 @@ import org.apache.hadoop.fs.permission.AclEntryScope;
 import org.apache.hadoop.fs.permission.AclEntryType;
 import org.apache.hadoop.fs.permission.AclEntryType;
 import org.apache.hadoop.fs.permission.FsAction;
 import org.apache.hadoop.fs.permission.FsAction;
 import org.apache.hadoop.fs.permission.FsPermission;
 import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.hdfs.util.ReadOnlyList;
 import org.apache.hadoop.security.AccessControlException;
 import org.apache.hadoop.security.AccessControlException;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.util.StringUtils;
 import org.apache.hadoop.util.StringUtils;
@@ -136,6 +137,7 @@ class FSPermissionChecker {
    * @param subAccess If path is a directory,
    * @param subAccess If path is a directory,
    * it is the access required of the path and all the sub-directories.
    * it is the access required of the path and all the sub-directories.
    * If path is not a directory, there is no effect.
    * If path is not a directory, there is no effect.
+   * @param ignoreEmptyDir Ignore permission checking for empty directory?
    * @param resolveLink whether to resolve the final path component if it is
    * @param resolveLink whether to resolve the final path component if it is
    * a symlink
    * a symlink
    * @throws AccessControlException
    * @throws AccessControlException
@@ -146,7 +148,7 @@ class FSPermissionChecker {
    */
    */
   void checkPermission(String path, FSDirectory dir, boolean doCheckOwner,
   void checkPermission(String path, FSDirectory dir, boolean doCheckOwner,
       FsAction ancestorAccess, FsAction parentAccess, FsAction access,
       FsAction ancestorAccess, FsAction parentAccess, FsAction access,
-      FsAction subAccess, boolean resolveLink)
+      FsAction subAccess, boolean ignoreEmptyDir, boolean resolveLink)
       throws AccessControlException, UnresolvedLinkException {
       throws AccessControlException, UnresolvedLinkException {
     if (LOG.isDebugEnabled()) {
     if (LOG.isDebugEnabled()) {
       LOG.debug("ACCESS CHECK: " + this
       LOG.debug("ACCESS CHECK: " + this
@@ -155,6 +157,7 @@ class FSPermissionChecker {
           + ", parentAccess=" + parentAccess
           + ", parentAccess=" + parentAccess
           + ", access=" + access
           + ", access=" + access
           + ", subAccess=" + subAccess
           + ", subAccess=" + subAccess
+          + ", ignoreEmptyDir=" + ignoreEmptyDir
           + ", resolveLink=" + resolveLink);
           + ", resolveLink=" + resolveLink);
     }
     }
     // check if (parentAccess != null) && file exists, then check sb
     // check if (parentAccess != null) && file exists, then check sb
@@ -182,7 +185,7 @@ class FSPermissionChecker {
       check(last, snapshotId, access);
       check(last, snapshotId, access);
     }
     }
     if (subAccess != null) {
     if (subAccess != null) {
-      checkSubAccess(last, snapshotId, subAccess);
+      checkSubAccess(last, snapshotId, subAccess, ignoreEmptyDir);
     }
     }
     if (doCheckOwner) {
     if (doCheckOwner) {
       checkOwner(last, snapshotId);
       checkOwner(last, snapshotId);
@@ -207,8 +210,8 @@ class FSPermissionChecker {
   }
   }
 
 
   /** Guarded by {@link FSNamesystem#readLock()} */
   /** Guarded by {@link FSNamesystem#readLock()} */
-  private void checkSubAccess(INode inode, int snapshotId, FsAction access
-      ) throws AccessControlException {
+  private void checkSubAccess(INode inode, int snapshotId, FsAction access,
+      boolean ignoreEmptyDir) throws AccessControlException {
     if (inode == null || !inode.isDirectory()) {
     if (inode == null || !inode.isDirectory()) {
       return;
       return;
     }
     }
@@ -216,9 +219,12 @@ class FSPermissionChecker {
     Stack<INodeDirectory> directories = new Stack<INodeDirectory>();
     Stack<INodeDirectory> directories = new Stack<INodeDirectory>();
     for(directories.push(inode.asDirectory()); !directories.isEmpty(); ) {
     for(directories.push(inode.asDirectory()); !directories.isEmpty(); ) {
       INodeDirectory d = directories.pop();
       INodeDirectory d = directories.pop();
-      check(d, snapshotId, access);
+      ReadOnlyList<INode> cList = d.getChildrenList(snapshotId);
+      if (!(cList.isEmpty() && ignoreEmptyDir)) {
+        check(d, snapshotId, access);
+      }
 
 
-      for(INode child : d.getChildrenList(snapshotId)) {
+      for(INode child : cList) {
         if (child.isDirectory()) {
         if (child.isDirectory()) {
           directories.push(child.asDirectory());
           directories.push(child.asDirectory());
         }
         }

+ 27 - 7
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSPermission.java

@@ -429,6 +429,7 @@ public class TestDFSPermission {
       short[] ancestorPermission, short[] parentPermission,
       short[] ancestorPermission, short[] parentPermission,
       short[] filePermission, Path[] parentDirs, Path[] files, Path[] dirs)
       short[] filePermission, Path[] parentDirs, Path[] files, Path[] dirs)
       throws Exception {
       throws Exception {
+    boolean[] isDirEmpty = new boolean[NUM_TEST_PERMISSIONS];
     login(SUPERUSER);
     login(SUPERUSER);
     for (int i = 0; i < NUM_TEST_PERMISSIONS; i++) {
     for (int i = 0; i < NUM_TEST_PERMISSIONS; i++) {
       create(OpType.CREATE, files[i]);
       create(OpType.CREATE, files[i]);
@@ -441,6 +442,8 @@ public class TestDFSPermission {
       FsPermission fsPermission = new FsPermission(filePermission[i]);
       FsPermission fsPermission = new FsPermission(filePermission[i]);
       fs.setPermission(files[i], fsPermission);
       fs.setPermission(files[i], fsPermission);
       fs.setPermission(dirs[i], fsPermission);
       fs.setPermission(dirs[i], fsPermission);
+
+      isDirEmpty[i] = (fs.listStatus(dirs[i]).length == 0);
     }
     }
 
 
     login(ugi);
     login(ugi);
@@ -461,7 +464,7 @@ public class TestDFSPermission {
           parentPermission[i], ancestorPermission[next], parentPermission[next]);
           parentPermission[i], ancestorPermission[next], parentPermission[next]);
       testDeleteFile(ugi, files[i], ancestorPermission[i], parentPermission[i]);
       testDeleteFile(ugi, files[i], ancestorPermission[i], parentPermission[i]);
       testDeleteDir(ugi, dirs[i], ancestorPermission[i], parentPermission[i],
       testDeleteDir(ugi, dirs[i], ancestorPermission[i], parentPermission[i],
-          filePermission[i], null);
+          filePermission[i], null, isDirEmpty[i]);
     }
     }
     
     
     // test non existent file
     // test non existent file
@@ -924,7 +927,8 @@ public class TestDFSPermission {
   }
   }
 
 
   /* A class that verifies the permission checking is correct for
   /* A class that verifies the permission checking is correct for
-   * directory deletion */
+   * directory deletion
+   */
   private class DeleteDirPermissionVerifier extends DeletePermissionVerifier {
   private class DeleteDirPermissionVerifier extends DeletePermissionVerifier {
     private short[] childPermissions;
     private short[] childPermissions;
 
 
@@ -958,6 +962,17 @@ public class TestDFSPermission {
     }
     }
   }
   }
 
 
+  /* A class that verifies the permission checking is correct for
+   * empty-directory deletion
+   */
+  private class DeleteEmptyDirPermissionVerifier extends DeleteDirPermissionVerifier {
+    @Override
+    void setOpPermission() {
+      this.opParentPermission = SEARCH_MASK | WRITE_MASK;
+      this.opPermission = NULL_MASK;
+    }
+  }
+
   final DeletePermissionVerifier fileDeletionVerifier =
   final DeletePermissionVerifier fileDeletionVerifier =
     new DeletePermissionVerifier();
     new DeletePermissionVerifier();
 
 
@@ -971,14 +986,19 @@ public class TestDFSPermission {
   final DeleteDirPermissionVerifier dirDeletionVerifier =
   final DeleteDirPermissionVerifier dirDeletionVerifier =
     new DeleteDirPermissionVerifier();
     new DeleteDirPermissionVerifier();
 
 
+  final DeleteEmptyDirPermissionVerifier emptyDirDeletionVerifier =
+      new DeleteEmptyDirPermissionVerifier();
+
   /* test if the permission checking of directory deletion is correct */
   /* test if the permission checking of directory deletion is correct */
   private void testDeleteDir(UserGroupInformation ugi, Path path,
   private void testDeleteDir(UserGroupInformation ugi, Path path,
       short ancestorPermission, short parentPermission, short permission,
       short ancestorPermission, short parentPermission, short permission,
-      short[] childPermissions) throws Exception {
-    dirDeletionVerifier.set(path, ancestorPermission, parentPermission,
-        permission, childPermissions);
-    dirDeletionVerifier.verifyPermission(ugi);
-
+      short[] childPermissions,
+      final boolean isDirEmpty) throws Exception {
+    DeleteDirPermissionVerifier ddpv = isDirEmpty?
+        emptyDirDeletionVerifier : dirDeletionVerifier;
+    ddpv.set(path, ancestorPermission, parentPermission, permission,
+        childPermissions);
+    ddpv.verifyPermission(ugi);
   }
   }
 
 
   /* log into dfs as the given user */
   /* log into dfs as the given user */

+ 274 - 0
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestFsShellPermission.java

@@ -0,0 +1,274 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hdfs;
+
+import static org.junit.Assert.assertEquals;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.PrintStream;
+import java.net.URI;
+import java.security.PrivilegedExceptionAction;
+import java.util.ArrayList;
+import java.util.Arrays;
+
+import org.apache.commons.lang.StringUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.FileSystemTestHelper;
+import org.apache.hadoop.fs.FsShell;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.junit.Test;
+
+/**
+ * This test covers privilege related aspects of FsShell
+ *
+ */
+public class TestFsShellPermission {
+
+  static private final String TEST_ROOT = "/testroot";
+
+  static UserGroupInformation createUGI(String ownername, String groupName) {
+    return UserGroupInformation.createUserForTesting(ownername,
+        new String[]{groupName});
+  }
+
+  private class FileEntry {
+    private String path;
+    private boolean isDir;
+    private String owner;
+    private String group;
+    private String permission;
+    public FileEntry(String path, boolean isDir,
+        String owner, String group, String permission) {
+      this.path = path;
+      this.isDir = isDir;
+      this.owner = owner;
+      this.group = group;
+      this.permission = permission;
+    }
+    String getPath() { return path; }
+    boolean isDirectory() { return isDir; }
+    String getOwner() { return owner; }
+    String getGroup() { return group; }
+    String getPermission() { return permission; }
+  }
+
+  private void createFiles(FileSystem fs, String topdir,
+      FileEntry[] entries) throws IOException {
+    for (FileEntry entry : entries) {
+      String newPathStr = topdir + "/" + entry.getPath();
+      Path newPath = new Path(newPathStr);
+      if (entry.isDirectory()) {
+        fs.mkdirs(newPath);
+      } else {
+        FileSystemTestHelper.createFile(fs,  newPath);
+      }
+      fs.setPermission(newPath, new FsPermission(entry.getPermission()));
+      fs.setOwner(newPath, entry.getOwner(), entry.getGroup());
+    }
+  }
+
+  /** delete directory and everything underneath it.*/
+  private static void deldir(FileSystem fs, String topdir) throws IOException {
+    fs.delete(new Path(topdir), true);
+  }
+
+  static String execCmd(FsShell shell, final String[] args) throws Exception {
+    ByteArrayOutputStream baout = new ByteArrayOutputStream();
+    PrintStream out = new PrintStream(baout, true);
+    PrintStream old = System.out;
+    System.setOut(out);
+    int ret = shell.run(args);
+    out.close();
+    System.setOut(old);
+    return String.valueOf(ret);
+  }
+
+  /*
+   * Each instance of TestDeleteHelper captures one testing scenario.
+   *
+   * To create all files listed in fileEntries, and then delete as user
+   * doAsuser the deleteEntry with command+options specified in cmdAndOptions.
+   *
+   * When expectedToDelete is true, the deleteEntry is expected to be deleted;
+   * otherwise, it's not expected to be deleted. At the end of test,
+   * the existence of deleteEntry is checked against expectedToDelete
+   * to ensure the command is finished with expected result
+   */
+  private class TestDeleteHelper {
+    private FileEntry[] fileEntries;
+    private FileEntry deleteEntry;
+    private String cmdAndOptions;
+    private boolean expectedToDelete;
+
+    final String doAsGroup;
+    final UserGroupInformation userUgi;
+
+    public TestDeleteHelper(
+        FileEntry[] fileEntries,
+        FileEntry deleteEntry,
+        String cmdAndOptions,
+        String doAsUser,
+        boolean expectedToDelete) {
+      this.fileEntries = fileEntries;
+      this.deleteEntry = deleteEntry;
+      this.cmdAndOptions = cmdAndOptions;
+      this.expectedToDelete = expectedToDelete;
+
+      doAsGroup = doAsUser.equals("hdfs")? "supergroup" : "users";
+      userUgi = createUGI(doAsUser, doAsGroup);
+    }
+
+    public void execute(Configuration conf, FileSystem fs) throws Exception {
+      fs.mkdirs(new Path(TEST_ROOT));
+
+      createFiles(fs, TEST_ROOT, fileEntries);
+      final FsShell fsShell = new FsShell(conf);
+      final String deletePath =  TEST_ROOT + "/" + deleteEntry.getPath();
+
+      String[] tmpCmdOpts = StringUtils.split(cmdAndOptions);
+      ArrayList<String> tmpArray = new ArrayList<String>(Arrays.asList(tmpCmdOpts));
+      tmpArray.add(deletePath);
+      final String[] cmdOpts = tmpArray.toArray(new String[tmpArray.size()]);
+      userUgi.doAs(new PrivilegedExceptionAction<String>() {
+        public String run() throws Exception {
+          return execCmd(fsShell, cmdOpts);
+        }
+      });
+
+      boolean deleted = !fs.exists(new Path(deletePath));
+      assertEquals(expectedToDelete, deleted);
+
+      deldir(fs, TEST_ROOT);
+    }
+  }
+
+  private TestDeleteHelper genDeleteEmptyDirHelper(final String cmdOpts,
+      final String targetPerm,
+      final String asUser,
+      boolean expectedToDelete) {
+    FileEntry[] files = {
+        new FileEntry("userA", true, "userA", "users", "755"),
+        new FileEntry("userA/userB", true, "userB", "users", targetPerm)
+    };
+    FileEntry deleteEntry = files[1];
+    return new TestDeleteHelper(files, deleteEntry, cmdOpts, asUser,
+        expectedToDelete);
+  }
+
+  // Expect target to be deleted
+  private TestDeleteHelper genRmrEmptyDirWithReadPerm() {
+    return genDeleteEmptyDirHelper("-rm -r", "744", "userA", true);
+  }
+
+  // Expect target to be deleted
+  private TestDeleteHelper genRmrEmptyDirWithNoPerm() {
+    return genDeleteEmptyDirHelper("-rm -r", "700", "userA", true);
+  }
+
+  // Expect target to be deleted
+  private TestDeleteHelper genRmrfEmptyDirWithNoPerm() {
+    return genDeleteEmptyDirHelper("-rm -r -f", "700", "userA", true);
+  }
+
+  private TestDeleteHelper genDeleteNonEmptyDirHelper(final String cmd,
+      final String targetPerm,
+      final String asUser,
+      boolean expectedToDelete) {
+    FileEntry[] files = {
+        new FileEntry("userA", true, "userA", "users", "755"),
+        new FileEntry("userA/userB", true, "userB", "users", targetPerm),
+        new FileEntry("userA/userB/xyzfile", false, "userB", "users",
+            targetPerm)
+    };
+    FileEntry deleteEntry = files[1];
+    return new TestDeleteHelper(files, deleteEntry, cmd, asUser,
+        expectedToDelete);
+  }
+
+  // Expect target not to be deleted
+  private TestDeleteHelper genRmrNonEmptyDirWithReadPerm() {
+    return genDeleteNonEmptyDirHelper("-rm -r", "744", "userA", false);
+  }
+
+  // Expect target not to be deleted
+  private TestDeleteHelper genRmrNonEmptyDirWithNoPerm() {
+    return genDeleteNonEmptyDirHelper("-rm -r", "700", "userA", false);
+  }
+
+  // Expect target to be deleted
+  private TestDeleteHelper genRmrNonEmptyDirWithAllPerm() {
+    return genDeleteNonEmptyDirHelper("-rm -r", "777", "userA", true);
+  }
+
+  // Expect target not to be deleted
+  private TestDeleteHelper genRmrfNonEmptyDirWithNoPerm() {
+    return genDeleteNonEmptyDirHelper("-rm -r -f", "700", "userA", false);
+  }
+
+  // Expect target to be deleted
+  public TestDeleteHelper genDeleteSingleFileNotAsOwner() throws Exception {
+    FileEntry[] files = {
+        new FileEntry("userA", true, "userA", "users", "755"),
+        new FileEntry("userA/userB", false, "userB", "users", "700")
+    };
+    FileEntry deleteEntry = files[1];
+    return new TestDeleteHelper(files, deleteEntry, "-rm -r", "userA", true);
+  }
+
+  @Test
+  public void testDelete() throws Exception {
+    Configuration conf = null;
+    MiniDFSCluster cluster = null;
+    try {
+      conf = new Configuration();
+      cluster = new MiniDFSCluster.Builder(conf).numDataNodes(2).build();
+
+      String nnUri = FileSystem.getDefaultUri(conf).toString();
+      FileSystem fs = FileSystem.get(URI.create(nnUri), conf);
+
+      ArrayList<TestDeleteHelper> ta = new ArrayList<TestDeleteHelper>();
+
+      // Add empty dir tests
+      ta.add(genRmrEmptyDirWithReadPerm());
+      ta.add(genRmrEmptyDirWithNoPerm());
+      ta.add(genRmrfEmptyDirWithNoPerm());
+
+      // Add non-empty dir tests
+      ta.add(genRmrNonEmptyDirWithReadPerm());
+      ta.add(genRmrNonEmptyDirWithNoPerm());
+      ta.add(genRmrNonEmptyDirWithAllPerm());
+      ta.add(genRmrfNonEmptyDirWithNoPerm());
+
+      // Add single tile test
+      ta.add(genDeleteSingleFileNotAsOwner());
+
+      // Run all tests
+      for(TestDeleteHelper t : ta) {
+        t.execute(conf,  fs);
+      }
+    } finally {
+      if (cluster != null) { cluster.shutdown(); }
+    }
+  }
+}

+ 2 - 2
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestFSPermissionChecker.java

@@ -393,14 +393,14 @@ public class TestFSPermissionChecker {
   private void assertPermissionGranted(UserGroupInformation user, String path,
   private void assertPermissionGranted(UserGroupInformation user, String path,
       FsAction access) throws IOException {
       FsAction access) throws IOException {
     new FSPermissionChecker(SUPERUSER, SUPERGROUP, user).checkPermission(path,
     new FSPermissionChecker(SUPERUSER, SUPERGROUP, user).checkPermission(path,
-      dir, false, null, null, access, null, true);
+      dir, false, null, null, access, null, false, true);
   }
   }
 
 
   private void assertPermissionDenied(UserGroupInformation user, String path,
   private void assertPermissionDenied(UserGroupInformation user, String path,
       FsAction access) throws IOException {
       FsAction access) throws IOException {
     try {
     try {
       new FSPermissionChecker(SUPERUSER, SUPERGROUP, user).checkPermission(path,
       new FSPermissionChecker(SUPERUSER, SUPERGROUP, user).checkPermission(path,
-        dir, false, null, null, access, null, true);
+        dir, false, null, null, access, null, false, true);
       fail("expected AccessControlException for user + " + user + ", path = " +
       fail("expected AccessControlException for user + " + user + ", path = " +
         path + ", access = " + access);
         path + ", access = " + access);
     } catch (AccessControlException e) {
     } catch (AccessControlException e) {