Explorar el Código

HDFS-6099. HDFS file system limits not enforced on renames. Contributed by Chris Nauroth.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1579122 13f79535-47bb-0310-9956-ffa450edef68
Chris Nauroth hace 11 años
padre
commit
2b03ae9421

+ 2 - 0
hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt

@@ -647,6 +647,8 @@ Release 2.4.0 - UNRELEASED
     HDFS-6117. Print file path information in FileNotFoundException on INode
     ID mismatch. (suresh)
 
+    HDFS-6099. HDFS file system limits not enforced on renames. (cnauroth)
+
   BREAKDOWN OF HDFS-5698 SUBTASKS AND RELATED JIRAS
 
     HDFS-5717. Save FSImage header in protobuf. (Haohui Mai via jing9)

+ 33 - 3
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java

@@ -643,6 +643,7 @@ public class FSDirectory implements Closeable {
     }
     
     // Ensure dst has quota to accommodate rename
+    verifyFsLimitsForRename(srcIIP, dstIIP);
     verifyQuotaForRename(srcIIP.getINodes(), dstIIP.getINodes());
     
     boolean added = false;
@@ -894,6 +895,7 @@ public class FSDirectory implements Closeable {
     }
 
     // Ensure dst has quota to accommodate rename
+    verifyFsLimitsForRename(srcIIP, dstIIP);
     verifyQuotaForRename(srcIIP.getINodes(), dstIIP.getINodes());
 
     INode srcChild = srcIIP.getLastINode();
@@ -2134,6 +2136,27 @@ public class FSDirectory implements Closeable {
         delta.get(Quota.DISKSPACE), src[i - 1]);
   }
 
+  /**
+   * Checks file system limits (max component length and max directory items)
+   * during a rename operation.
+   *
+   * @param srcIIP INodesInPath containing every inode in the rename source
+   * @param dstIIP INodesInPath containing every inode in the rename destination
+   * @throws PathComponentTooLongException child's name is too long.
+   * @throws MaxDirectoryItemsExceededException too many children.
+   */
+  private void verifyFsLimitsForRename(INodesInPath srcIIP, INodesInPath dstIIP)
+      throws PathComponentTooLongException, MaxDirectoryItemsExceededException {
+    byte[] dstChildName = dstIIP.getLastLocalName();
+    INode[] dstInodes = dstIIP.getINodes();
+    int pos = dstInodes.length - 1;
+    verifyMaxComponentLength(dstChildName, dstInodes, pos);
+    // Do not enforce max directory items if renaming within same directory.
+    if (srcIIP.getINode(-2) != dstIIP.getINode(-2)) {
+      verifyMaxDirItems(dstInodes, pos);
+    }
+  }
+
   /** Verify if the snapshot name is legal. */
   void verifySnapshotName(String snapshotName, String path)
       throws PathComponentTooLongException {
@@ -2159,10 +2182,14 @@ public class FSDirectory implements Closeable {
 
   /**
    * Verify child's name for fs limit.
+   *
+   * @param childName byte[] containing new child name
+   * @param parentPath Object either INode[] or String containing parent path
+   * @param pos int position of new child in path
    * @throws PathComponentTooLongException child's name is too long.
    */
-  void verifyMaxComponentLength(byte[] childName, Object parentPath, int pos)
-      throws PathComponentTooLongException {
+  private void verifyMaxComponentLength(byte[] childName, Object parentPath,
+      int pos) throws PathComponentTooLongException {
     if (maxComponentLength == 0) {
       return;
     }
@@ -2184,9 +2211,12 @@ public class FSDirectory implements Closeable {
 
   /**
    * Verify children size for fs limit.
+   *
+   * @param pathComponents INode[] containing full path of inodes to new child
+   * @param pos int position of new child in pathComponents
    * @throws MaxDirectoryItemsExceededException too many children.
    */
-  void verifyMaxDirItems(INode[] pathComponents, int pos)
+  private void verifyMaxDirItems(INode[] pathComponents, int pos)
       throws MaxDirectoryItemsExceededException {
 
     final INodeDirectory parent = pathComponents[pos-1].asDirectory();

+ 4 - 1
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java

@@ -75,6 +75,7 @@ import org.apache.hadoop.hdfs.protocol.DatanodeID;
 import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
 import org.apache.hadoop.hdfs.protocol.DirectoryListing;
 import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
+import org.apache.hadoop.hdfs.protocol.FSLimitException;
 import org.apache.hadoop.hdfs.protocol.HdfsConstants;
 import org.apache.hadoop.hdfs.protocol.HdfsConstants.DatanodeReportType;
 import org.apache.hadoop.hdfs.protocol.HdfsConstants.RollingUpgradeAction;
@@ -358,7 +359,9 @@ class NameNodeRpcServer implements NamenodeProtocols {
         LeaseExpiredException.class,
         NSQuotaExceededException.class,
         DSQuotaExceededException.class,
-        AclException.class);
+        AclException.class,
+        FSLimitException.PathComponentTooLongException.class,
+        FSLimitException.MaxDirectoryItemsExceededException.class);
  }
 
   /** Allow access to the client RPC server for testing */

+ 101 - 41
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestFsLimits.java

@@ -19,6 +19,7 @@
 package org.apache.hadoop.hdfs.server.namenode;
 
 import static org.apache.hadoop.hdfs.server.common.Util.fileAsURI;
+import static org.apache.hadoop.util.Time.now;
 import static org.junit.Assert.assertEquals;
 import static org.mockito.Matchers.anyObject;
 import static org.mockito.Mockito.mock;
@@ -29,14 +30,15 @@ import java.io.IOException;
 
 import org.apache.hadoop.HadoopIllegalArgumentException;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Options.Rename;
 import org.apache.hadoop.fs.permission.FsPermission;
 import org.apache.hadoop.fs.permission.PermissionStatus;
 import org.apache.hadoop.hdfs.DFSConfigKeys;
-import org.apache.hadoop.hdfs.DFSUtil;
 import org.apache.hadoop.hdfs.MiniDFSCluster;
 import org.apache.hadoop.hdfs.protocol.FSLimitException.MaxDirectoryItemsExceededException;
 import org.apache.hadoop.hdfs.protocol.FSLimitException.PathComponentTooLongException;
 import org.apache.hadoop.hdfs.protocol.HdfsConstants;
+import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.NamenodeRole;
 import org.apache.hadoop.test.GenericTestUtils;
 import org.junit.Before;
 import org.junit.Test;
@@ -50,7 +52,12 @@ public class TestFsLimits {
   static PermissionStatus perms
     = new PermissionStatus("admin", "admin", FsPermission.getDefault());
 
-  static INodeDirectory rootInode;
+  static private FSImage getMockFSImage() {
+    FSEditLog editLog = mock(FSEditLog.class);
+    FSImage fsImage = mock(FSImage.class);
+    when(fsImage.getEditLog()).thenReturn(editLog);
+    return fsImage;
+  }
 
   static private FSNamesystem getMockNamesystem() {
     FSNamesystem fsn = mock(FSNamesystem.class);
@@ -64,8 +71,9 @@ public class TestFsLimits {
   
   private static class MockFSDirectory extends FSDirectory {
     public MockFSDirectory() throws IOException {
-      super(new FSImage(conf), getMockNamesystem(), conf);
+      super(getMockFSImage(), getMockNamesystem(), conf);
       setReady(fsIsReady);
+      NameNode.initMetrics(conf, NamenodeRole.NAMENODE);
     }
   }
 
@@ -76,21 +84,18 @@ public class TestFsLimits {
              fileAsURI(new File(MiniDFSCluster.getBaseDirectory(),
                                 "namenode")).toString());
 
-    rootInode = new INodeDirectory(getMockNamesystem().allocateNewInodeId(),
-        INodeDirectory.ROOT_NAME, perms, 0L);
-    inodes = new INode[]{ rootInode, null };
     fs = null;
     fsIsReady = true;
   }
 
   @Test
   public void testNoLimits() throws Exception {
-    addChildWithName("1", null);
-    addChildWithName("22", null);
-    addChildWithName("333", null);
-    addChildWithName("4444", null);
-    addChildWithName("55555", null);
-    addChildWithName(HdfsConstants.DOT_SNAPSHOT_DIR,
+    mkdirs("/1", null);
+    mkdirs("/22", null);
+    mkdirs("/333", null);
+    mkdirs("/4444", null);
+    mkdirs("/55555", null);
+    mkdirs("/1/" + HdfsConstants.DOT_SNAPSHOT_DIR,
         HadoopIllegalArgumentException.class);
   }
 
@@ -98,33 +103,65 @@ public class TestFsLimits {
   public void testMaxComponentLength() throws Exception {
     conf.setInt(DFSConfigKeys.DFS_NAMENODE_MAX_COMPONENT_LENGTH_KEY, 2);
     
-    addChildWithName("1", null);
-    addChildWithName("22", null);
-    addChildWithName("333", PathComponentTooLongException.class);
-    addChildWithName("4444", PathComponentTooLongException.class);
+    mkdirs("/1", null);
+    mkdirs("/22", null);
+    mkdirs("/333", PathComponentTooLongException.class);
+    mkdirs("/4444", PathComponentTooLongException.class);
+  }
+
+  @Test
+  public void testMaxComponentLengthRename() throws Exception {
+    conf.setInt(DFSConfigKeys.DFS_NAMENODE_MAX_COMPONENT_LENGTH_KEY, 2);
+
+    mkdirs("/5", null);
+    rename("/5", "/555", PathComponentTooLongException.class);
+    rename("/5", "/55", null);
+
+    mkdirs("/6", null);
+    deprecatedRename("/6", "/666", PathComponentTooLongException.class);
+    deprecatedRename("/6", "/66", null);
   }
 
   @Test
   public void testMaxDirItems() throws Exception {
     conf.setInt(DFSConfigKeys.DFS_NAMENODE_MAX_DIRECTORY_ITEMS_KEY, 2);
     
-    addChildWithName("1", null);
-    addChildWithName("22", null);
-    addChildWithName("333", MaxDirectoryItemsExceededException.class);
-    addChildWithName("4444", MaxDirectoryItemsExceededException.class);
+    mkdirs("/1", null);
+    mkdirs("/22", null);
+    mkdirs("/333", MaxDirectoryItemsExceededException.class);
+    mkdirs("/4444", MaxDirectoryItemsExceededException.class);
+  }
+
+  @Test
+  public void testMaxDirItemsRename() throws Exception {
+    conf.setInt(DFSConfigKeys.DFS_NAMENODE_MAX_DIRECTORY_ITEMS_KEY, 2);
+    
+    mkdirs("/1", null);
+    mkdirs("/2", null);
+
+    mkdirs("/2/A", null);
+    rename("/2/A", "/A", MaxDirectoryItemsExceededException.class);
+    rename("/2/A", "/1/A", null);
+
+    mkdirs("/2/B", null);
+    deprecatedRename("/2/B", "/B", MaxDirectoryItemsExceededException.class);
+    deprecatedRename("/2/B", "/1/B", null);
+
+    rename("/1", "/3", null);
+    deprecatedRename("/2", "/4", null);
   }
 
   @Test
   public void testMaxDirItemsLimits() throws Exception {
     conf.setInt(DFSConfigKeys.DFS_NAMENODE_MAX_DIRECTORY_ITEMS_KEY, 0);
     try {
-      addChildWithName("1", null);
+      mkdirs("1", null);
     } catch (IllegalArgumentException e) {
       GenericTestUtils.assertExceptionContains("Cannot set dfs", e);
     }
     conf.setInt(DFSConfigKeys.DFS_NAMENODE_MAX_DIRECTORY_ITEMS_KEY, 64*100*1024);
     try {
-      addChildWithName("1", null);
+      mkdirs("1", null);
     } catch (IllegalArgumentException e) {
       GenericTestUtils.assertExceptionContains("Cannot set dfs", e);
     }
@@ -135,10 +172,10 @@ public class TestFsLimits {
     conf.setInt(DFSConfigKeys.DFS_NAMENODE_MAX_COMPONENT_LENGTH_KEY, 3);
     conf.setInt(DFSConfigKeys.DFS_NAMENODE_MAX_DIRECTORY_ITEMS_KEY, 2);
     
-    addChildWithName("1", null);
-    addChildWithName("22", null);
-    addChildWithName("333", MaxDirectoryItemsExceededException.class);
-    addChildWithName("4444", PathComponentTooLongException.class);
+    mkdirs("/1", null);
+    mkdirs("/22", null);
+    mkdirs("/333", MaxDirectoryItemsExceededException.class);
+    mkdirs("/4444", PathComponentTooLongException.class);
   }
 
   @Test
@@ -147,32 +184,55 @@ public class TestFsLimits {
     conf.setInt(DFSConfigKeys.DFS_NAMENODE_MAX_DIRECTORY_ITEMS_KEY, 2);
     fsIsReady = false;
     
-    addChildWithName(HdfsConstants.DOT_SNAPSHOT_DIR,
+    mkdirs("/1", null);
+    mkdirs("/22", null);
+    mkdirs("/333", null);
+    mkdirs("/4444", null);
+    mkdirs("/1/" + HdfsConstants.DOT_SNAPSHOT_DIR,
         HadoopIllegalArgumentException.class);
-    addChildWithName("1", null);
-    addChildWithName("22", null);
-    addChildWithName("333", null);
-    addChildWithName("4444", null);
   }
 
-  private void addChildWithName(String name, Class<?> expected)
+  private void mkdirs(String name, Class<?> expected)
   throws Exception {
-    // have to create after the caller has had a chance to set conf values
-    if (fs == null) fs = new MockFSDirectory();
+    lazyInitFSDirectory();
+    Class<?> generated = null;
+    try {
+      fs.mkdirs(name, perms, false, now());
+    } catch (Throwable e) {
+      generated = e.getClass();
+    }
+    assertEquals(expected, generated);
+  }
 
-    INode child = new INodeDirectory(getMockNamesystem().allocateNewInodeId(),
-        DFSUtil.string2Bytes(name), perms, 0L);
-    
+  private void rename(String src, String dst, Class<?> expected)
+      throws Exception {
+    lazyInitFSDirectory();
     Class<?> generated = null;
     try {
-      fs.verifyMaxComponentLength(child.getLocalNameBytes(), inodes, 1);
-      fs.verifyMaxDirItems(inodes, 1);
-      fs.verifyINodeName(child.getLocalNameBytes());
+      fs.renameTo(src, dst, false, new Rename[] { });
+    } catch (Throwable e) {
+      generated = e.getClass();
+    }
+    assertEquals(expected, generated);
+  }
 
-      rootInode.addChild(child);
+  @SuppressWarnings("deprecation")
+  private void deprecatedRename(String src, String dst, Class<?> expected)
+      throws Exception {
+    lazyInitFSDirectory();
+    Class<?> generated = null;
+    try {
+      fs.renameTo(src, dst, false);
     } catch (Throwable e) {
       generated = e.getClass();
     }
     assertEquals(expected, generated);
   }
+
+  private static void lazyInitFSDirectory() throws IOException {
+    // have to create after the caller has had a chance to set conf values
+    if (fs == null) {
+      fs = new MockFSDirectory();
+    }
+  }
 }