浏览代码

HDFS-761. Fix failure to process rename operation from edits log due to quota verification. Contributed by Suresh Srinivas.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/hdfs/trunk@835110 13f79535-47bb-0310-9956-ffa450edef68
Suresh Srinivas 15 年之前
父节点
当前提交
b17118118d

+ 3 - 0
CHANGES.txt

@@ -539,3 +539,6 @@ Release 0.20.1 - 2009-09-01
 
     HDFS-525. The SimpleDateFormat object in ListPathsServlet is not thread
     safe. (Suresh Srinivas and cdouglas)
+
+    HDFS-761. Fix failure to process rename operation from edits log due to 
+    quota verification. (suresh)

+ 8 - 0
src/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java

@@ -1355,6 +1355,10 @@ class FSDirectory implements Closeable {
    */
   private void verifyQuota(INode[] inodes, int pos, long nsDelta, long dsDelta,
       INode commonAncestor) throws QuotaExceededException {
+    if (!ready) {
+      // Do not check quota if edits log is still being processed
+      return;
+    }
     if (nsDelta <= 0 && dsDelta <= 0) {
       // if quota is being freed or not being consumed
       return;
@@ -1392,6 +1396,10 @@ class FSDirectory implements Closeable {
    */
   private void verifyQuotaForRename(INode[] srcInodes, INode[]dstInodes)
       throws QuotaExceededException {
+    if (!ready) {
+      // Do not check quota if edits log is still being processed
+      return;
+    }
     INode srcInode = srcInodes[srcInodes.length - 1];
     INode commonAncestor = null;
     for(int i =0;srcInodes[i] == dstInodes[i]; i++) {

+ 74 - 4
src/test/hdfs/org/apache/hadoop/fs/TestHDFSFileContextMainOperations.java

@@ -23,7 +23,6 @@ import java.net.URISyntaxException;
 
 import javax.security.auth.login.LoginException;
 
-import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Options.Rename;
 import org.apache.hadoop.hdfs.DistributedFileSystem;
 import org.apache.hadoop.hdfs.HdfsConfiguration;
@@ -42,18 +41,31 @@ public class TestHDFSFileContextMainOperations extends
                                   FileContextMainOperationsBaseTest {
   private static MiniDFSCluster cluster;
   private static Path defaultWorkingDirectory;
+  private static HdfsConfiguration CONF = new HdfsConfiguration();
   
   @BeforeClass
   public static void clusterSetupAtBegining() throws IOException,
       LoginException, URISyntaxException {
-    Configuration conf = new HdfsConfiguration();
-    cluster = new MiniDFSCluster(conf, 2, true, null);
-    fc = FileContext.getFileContext(cluster.getURI(), conf);
+    cluster = new MiniDFSCluster(CONF, 2, true, null);
+    cluster.waitClusterUp();
+    fc = FileContext.getFileContext(cluster.getURI(), CONF);
     defaultWorkingDirectory = fc.makeQualified( new Path("/user/" + 
         UnixUserGroupInformation.login().getUserName()));
     fc.mkdir(defaultWorkingDirectory, FileContext.DEFAULT_PERM, true);
   }
 
+  private static void restartCluster() throws IOException, LoginException {
+    if (cluster != null) {
+      cluster.shutdown();
+      cluster = null;
+    }
+    cluster = new MiniDFSCluster(CONF, 1, false, null);
+    cluster.waitClusterUp();
+    fc = FileContext.getFileContext(cluster.getURI(), CONF);
+    defaultWorkingDirectory = fc.makeQualified( new Path("/user/" + 
+        UnixUserGroupInformation.login().getUserName()));
+    fc.mkdir(defaultWorkingDirectory, FileContext.DEFAULT_PERM, true);
+  }
       
   @AfterClass
   public static void ClusterShutdownAtEnd() throws Exception {
@@ -179,6 +191,64 @@ public class TestHDFSFileContextMainOperations extends
     rename(dst, src, true, false, true, Rename.OVERWRITE);
   }
   
+  /**
+   * Perform operations such as setting quota, deletion of files, rename and
+   * ensure system can apply edits log during startup.
+   */
+  @Test
+  public void testEditsLogOldRename() throws Exception {
+    DistributedFileSystem fs = (DistributedFileSystem) cluster.getFileSystem();
+    Path src1 = getTestRootPath("testEditsLogOldRename/srcdir/src1");
+    Path dst1 = getTestRootPath("testEditsLogOldRename/dstdir/dst1");
+    createFile(src1);
+    fs.mkdirs(dst1.getParent());
+    createFile(dst1);
+    
+    // Set quota so that dst1 parent cannot allow under it new files/directories 
+    fs.setQuota(dst1.getParent(), 2, FSConstants.QUOTA_DONT_SET);
+    // Free up quota for a subsequent rename
+    fs.delete(dst1, true);
+    oldRename(src1, dst1, true, false);
+    
+    // Restart the cluster and ensure the above operations can be
+    // loaded from the edits log
+    restartCluster();
+    fs = (DistributedFileSystem)cluster.getFileSystem();
+    src1 = getTestRootPath("testEditsLogOldRename/srcdir/src1");
+    dst1 = getTestRootPath("testEditsLogOldRename/dstdir/dst1");
+    Assert.assertFalse(fs.exists(src1));   // ensure src1 is already renamed
+    Assert.assertTrue(fs.exists(dst1));    // ensure rename dst exists
+  }
+  
+  /**
+   * Perform operations such as setting quota, deletion of files, rename and
+   * ensure system can apply edits log during startup.
+   */
+  @Test
+  public void testEditsLogRename() throws Exception {
+    DistributedFileSystem fs = (DistributedFileSystem) cluster.getFileSystem();
+    Path src1 = getTestRootPath("testEditsLogRename/srcdir/src1");
+    Path dst1 = getTestRootPath("testEditsLogRename/dstdir/dst1");
+    createFile(src1);
+    fs.mkdirs(dst1.getParent());
+    createFile(dst1);
+    
+    // Set quota so that dst1 parent cannot allow under it new files/directories 
+    fs.setQuota(dst1.getParent(), 2, FSConstants.QUOTA_DONT_SET);
+    // Free up quota for a subsequent rename
+    fs.delete(dst1, true);
+    rename(src1, dst1, true, true, false, Rename.OVERWRITE);
+    
+    // Restart the cluster and ensure the above operations can be
+    // loaded from the edits log
+    restartCluster();
+    fs = (DistributedFileSystem)cluster.getFileSystem();
+    src1 = getTestRootPath("testEditsLogRename/srcdir/src1");
+    dst1 = getTestRootPath("testEditsLogRename/dstdir/dst1");
+    Assert.assertFalse(fs.exists(src1));   // ensure src1 is already renamed
+    Assert.assertTrue(fs.exists(dst1));    // ensure rename dst exists
+  }
+  
   private void oldRename(Path src, Path dst, boolean renameSucceeds,
       boolean exception) throws Exception {
     DistributedFileSystem fs = (DistributedFileSystem) cluster.getFileSystem();