Procházet zdrojové kódy

HDFS-4524. Update SnapshotManager#snapshottables when loading fsimage. Contributed by Jing Zhao

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/HDFS-2802@1449265 13f79535-47bb-0310-9956-ffa450edef68
Tsz-wo Sze před 12 roky
rodič
revize
0deff1727e

+ 3 - 0
hadoop-hdfs-project/hadoop-hdfs/CHANGES.HDFS-2802.txt

@@ -167,3 +167,6 @@ Branch-2802 Snapshot (Unreleased)
 
   HDFS-4499. Fix file/directory/snapshot deletion for file diff.  (Jing Zhao
   via szetszwo)
+
+  HDFS-4524. Update SnapshotManager#snapshottables when loading fsimage.
+  (Jing Zhao via szetszwo)

+ 4 - 0
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImageFormat.java

@@ -413,6 +413,10 @@ public class FSImageFormat {
       if (numSnapshots >= 0) {
         final INodeDirectorySnapshottable snapshottableParent
             = INodeDirectorySnapshottable.valueOf(parent, parentPath);
+        if (snapshottableParent.getParent() != null) { // not root
+          this.namesystem.getSnapshotManager().addSnapshottable(
+              snapshottableParent);
+        }
         // load snapshots and snapshotQuota
         SnapshotFSImageFormat.loadSnapshotList(snapshottableParent,
             numSnapshots, in, this);

+ 9 - 0
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/snapshot/SnapshotManager.java

@@ -80,6 +80,15 @@ public class SnapshotManager implements SnapshotStats {
     snapshottables.add(s);
     numSnapshottableDirs.getAndIncrement();
   }
+  
+  /**
+   * Add a snapshottable dir into {@link #snapshottables}. Called when loading
+   * fsimage.
+   * @param dir The snapshottable dir to be added.
+   */
+  public void addSnapshottable(INodeDirectorySnapshottable dir) {
+    snapshottables.add(dir);
+  }
 
   /**
    * Set the given snapshottable directory to non-snapshottable.

+ 23 - 0
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestFSImageWithSnapshot.java

@@ -21,7 +21,9 @@ import java.io.File;
 import java.io.FileWriter;
 import java.io.IOException;
 import java.io.PrintWriter;
+import java.util.ArrayList;
 import java.util.EnumSet;
+import java.util.List;
 import java.util.Random;
 
 import org.apache.commons.logging.impl.Log4JLogger;
@@ -33,11 +35,13 @@ import org.apache.hadoop.hdfs.DistributedFileSystem;
 import org.apache.hadoop.hdfs.MiniDFSCluster;
 import org.apache.hadoop.hdfs.client.HdfsDataOutputStream;
 import org.apache.hadoop.hdfs.client.HdfsDataOutputStream.SyncFlag;
+import org.apache.hadoop.hdfs.protocol.SnapshottableDirectoryStatus;
 import org.apache.hadoop.hdfs.server.namenode.NNStorage.NameNodeFile;
 import org.apache.hadoop.hdfs.server.namenode.snapshot.SnapshotTestHelper;
 import org.apache.hadoop.hdfs.util.Canceler;
 import org.apache.log4j.Level;
 import org.junit.After;
+import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
 
@@ -214,6 +218,10 @@ public class TestFSImageWithSnapshot {
     
     // save the namesystem to a temp file
     File imageFile = saveFSImageToTempFile();
+    
+    long numSdirBefore = fsn.getNumSnapshottableDirs();
+    long numSnapshotBefore = fsn.getNumSnapshots();
+    SnapshottableDirectoryStatus[] dirBefore = hdfs.getSnapshottableDirListing();
 
     // restart the cluster, and format the cluster
     cluster.shutdown();
@@ -231,6 +239,21 @@ public class TestFSImageWithSnapshot {
     
     // compare two dumped tree
     SnapshotTestHelper.compareDumpedTreeInFile(fsnBefore, fsnAfter);
+    
+    long numSdirAfter = fsn.getNumSnapshottableDirs();
+    long numSnapshotAfter = fsn.getNumSnapshots();
+    SnapshottableDirectoryStatus[] dirAfter = hdfs.getSnapshottableDirListing();
+    
+    Assert.assertEquals(numSdirBefore, numSdirAfter);
+    Assert.assertEquals(numSnapshotBefore, numSnapshotAfter);
+    Assert.assertEquals(dirBefore.length, dirAfter.length);
+    List<String> pathListBefore = new ArrayList<String>();
+    for (SnapshottableDirectoryStatus sBefore : dirBefore) {
+      pathListBefore.add(sBefore.getFullPath().toString());
+    }
+    for (SnapshottableDirectoryStatus sAfter : dirAfter) {
+      Assert.assertTrue(pathListBefore.contains(sAfter.getFullPath().toString()));
+    }
   }
   
   /**