Просмотр исходного кода

HDFS-5808. Implement cancellation when saving FSImage. Contributed by Haohui Mai.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/HDFS-5698@1560793 13f79535-47bb-0310-9956-ffa450edef68
Jing Zhao 11 лет назад
Родитель
Сommit
57f27ec60c

+ 2 - 0
hadoop-hdfs-project/hadoop-hdfs/CHANGES_HDFS-5698.txt

@@ -22,3 +22,5 @@ HDFS-5698 subtasks
 
     HDFS-5824. Add a Type field in Snapshot DiffEntry's protobuf definition.
     (jing9)
+
+    HDFS-5808. Implement cancellation when saving FSImage. (Haohui Mai via jing9)

+ 15 - 1
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImageFormatPBINode.java

@@ -314,22 +314,26 @@ public final class FSImageFormatPBINode {
 
     private final FSNamesystem fsn;
     private final FileSummary.Builder summary;
+    private final SaveNamespaceContext context;
     private final FSImageFormatProtobuf.Saver parent;
 
     Saver(FSImageFormatProtobuf.Saver parent, FileSummary.Builder summary) {
       this.parent = parent;
       this.summary = summary;
-      this.fsn = parent.context.getSourceNamesystem();
+      this.context = parent.getContext();
+      this.fsn = context.getSourceNamesystem();
     }
 
     void serializeINodeDirectorySection(OutputStream out) throws IOException {
       Iterator<INodeWithAdditionalFields> iter = fsn.getFSDirectory()
           .getINodeMap().getMapIterator();
+      int i = 0;
       while (iter.hasNext()) {
         INodeWithAdditionalFields n = iter.next();
         if (!n.isDirectory()) {
           continue;
         }
+
         ReadOnlyList<INode> children = n.asDirectory().getChildrenList(
             Snapshot.CURRENT_STATE_ID);
         if (children.size() > 0) {
@@ -351,6 +355,11 @@ public final class FSImageFormatPBINode {
             rb.build().writeDelimitedTo(out);
           }
         }
+
+        ++i;
+        if (i % FSImageFormatProtobuf.Saver.CHECK_CANCEL_INTERVAL == 0) {
+          context.checkCancelled();
+        }
       }
       parent.commitSection(summary,
           FSImageFormatProtobuf.SectionName.INODE_DIR);
@@ -364,10 +373,15 @@ public final class FSImageFormatPBINode {
       INodeSection s = b.build();
       s.writeDelimitedTo(out);
 
+      int i = 0;
       Iterator<INodeWithAdditionalFields> iter = inodesMap.getMapIterator();
       while (iter.hasNext()) {
         INodeWithAdditionalFields n = iter.next();
         save(out, n);
+        ++i;
+        if (i % FSImageFormatProtobuf.Saver.CHECK_CANCEL_INTERVAL == 0) {
+          context.checkCancelled();
+        }
       }
       parent.commitSection(summary, FSImageFormatProtobuf.SectionName.INODE);
     }

+ 11 - 2
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImageFormatProtobuf.java

@@ -291,7 +291,7 @@ public final class FSImageFormatProtobuf {
   }
 
   public static final class Saver {
-    final SaveNamespaceContext context;
+    private final SaveNamespaceContext context;
     private long currentOffset = MAGIC_HEADER.length;
     private MD5Hash savedDigest;
     private StringMap stringMap = new StringMap();
@@ -301,6 +301,7 @@ public final class FSImageFormatProtobuf {
     private OutputStream sectionOutputStream;
     private CompressionCodec codec;
     private OutputStream underlyingOutputStream;
+    public static final int CHECK_CANCEL_INTERVAL = 4096;
 
     Saver(SaveNamespaceContext context) {
       this.context = context;
@@ -310,6 +311,10 @@ public final class FSImageFormatProtobuf {
       return savedDigest;
     }
 
+    public SaveNamespaceContext getContext() {
+      return context;
+    }
+
     public void commitSection(FileSummary.Builder summary, SectionName name)
         throws IOException {
       long oldOffset = currentOffset;
@@ -363,7 +368,7 @@ public final class FSImageFormatProtobuf {
     private void saveSnapshots(FileSummary.Builder summary) throws IOException {
       FSImageFormatPBSnapshot.Saver snapshotSaver =
           new FSImageFormatPBSnapshot.Saver(this, summary,
-              context.getSourceNamesystem());
+              context, context.getSourceNamesystem());
       snapshotSaver.serializeSnapshotSection(sectionOutputStream);
       snapshotSaver.serializeSnapshotDiffSection(sectionOutputStream);
     }
@@ -390,6 +395,10 @@ public final class FSImageFormatProtobuf {
       }
 
       saveNameSystemSection(b);
+      // Check for cancellation right after serializing the name system section.
+      // Some unit tests, such as TestSaveNamespace#testCancelSaveNameSpace
+      // depends on this behavior.
+      context.checkCancelled();
       saveInodes(b);
       saveSnapshots(b);
       saveStringTableSection(b);

+ 4 - 2
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/SaveNamespaceContext.java

@@ -22,6 +22,7 @@ import java.util.Collections;
 import java.util.List;
 import java.util.concurrent.CountDownLatch;
 
+import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.hdfs.server.common.Storage.StorageDirectory;
 import org.apache.hadoop.hdfs.util.Canceler;
 
@@ -32,7 +33,8 @@ import com.google.common.base.Preconditions;
  * allows cancellation, and also is responsible for accumulating
  * failed storage directories.
  */
-class SaveNamespaceContext {
+@InterfaceAudience.Private
+public class SaveNamespaceContext {
   private final FSNamesystem sourceNamesystem;
   private final long txid;
   private final List<StorageDirectory> errorSDs =
@@ -72,7 +74,7 @@ class SaveNamespaceContext {
     completionLatch.countDown();
   }
 
-  void checkCancelled() throws SaveNamespaceCancelledException {
+  public void checkCancelled() throws SaveNamespaceCancelledException {
     if (canceller.isCancelled()) {
       throw new SaveNamespaceCancelledException(
           canceller.getCancellationReason());

+ 12 - 1
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/snapshot/FSImageFormatPBSnapshot.java

@@ -55,6 +55,7 @@ import org.apache.hadoop.hdfs.server.namenode.INodeFileAttributes;
 import org.apache.hadoop.hdfs.server.namenode.INodeMap;
 import org.apache.hadoop.hdfs.server.namenode.INodeReference;
 import org.apache.hadoop.hdfs.server.namenode.INodeWithAdditionalFields;
+import org.apache.hadoop.hdfs.server.namenode.SaveNamespaceContext;
 import org.apache.hadoop.hdfs.server.namenode.snapshot.DirectoryWithSnapshotFeature.DirectoryDiff;
 import org.apache.hadoop.hdfs.server.namenode.snapshot.DirectoryWithSnapshotFeature.DirectoryDiffList;
 import org.apache.hadoop.hdfs.server.namenode.snapshot.Snapshot.Root;
@@ -280,11 +281,13 @@ public class FSImageFormatPBSnapshot {
     private final FSNamesystem fsn;
     private final FileSummary.Builder headers;
     private final FSImageFormatProtobuf.Saver parent;
+    private final SaveNamespaceContext context;
 
     public Saver(FSImageFormatProtobuf.Saver parent,
-        FileSummary.Builder headers, FSNamesystem fsn) {
+        FileSummary.Builder headers, SaveNamespaceContext context, FSNamesystem fsn) {
       this.parent = parent;
       this.headers = headers;
+      this.context = context;
       this.fsn = fsn;
     }
 
@@ -317,6 +320,9 @@ public class FSImageFormatPBSnapshot {
               .setDirectory(db).build();
           sb.setRoot(r).build().writeDelimitedTo(out);
           i++;
+          if (i % FSImageFormatProtobuf.Saver.CHECK_CANCEL_INTERVAL == 0) {
+            context.checkCancelled();
+          }
         }
       }
       Preconditions.checkState(i == sm.getNumSnapshots());
@@ -329,6 +335,7 @@ public class FSImageFormatPBSnapshot {
     public void serializeSnapshotDiffSection(OutputStream out)
         throws IOException {
       INodeMap inodesMap = fsn.getFSDirectory().getINodeMap();
+      int i = 0;
       Iterator<INodeWithAdditionalFields> iter = inodesMap.getMapIterator();
       while (iter.hasNext()) {
         INodeWithAdditionalFields inode = iter.next();
@@ -337,6 +344,10 @@ public class FSImageFormatPBSnapshot {
         } else if (inode.isDirectory()) {
           serializeDirDiffList(inode.asDirectory(), out);
         }
+        ++i;
+        if (i % FSImageFormatProtobuf.Saver.CHECK_CANCEL_INTERVAL == 0) {
+          context.checkCancelled();
+        }
       }
       parent.commitSection(headers,
           FSImageFormatProtobuf.SectionName.SNAPSHOT_DIFF);

+ 1 - 1
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestStandbyCheckpoints.java

@@ -287,7 +287,6 @@ public class TestStandbyCheckpoints {
     doEdits(0, 1000);
     nn0.getRpcServer().rollEditLog();
     answerer.waitForCall();
-    answerer.proceed();
     assertTrue("SBN is not performing checkpoint but it should be.",
         answerer.getFireCount() == 1 && answerer.getResultCount() == 0);
     
@@ -306,6 +305,7 @@ public class TestStandbyCheckpoints {
     // RPC to the SBN happened during the checkpoint.
     assertTrue("SBN should have still been checkpointing.",
         answerer.getFireCount() == 1 && answerer.getResultCount() == 0);
+    answerer.proceed();
     answerer.waitForResult();
     assertTrue("SBN should have finished checkpointing.",
         answerer.getFireCount() == 1 && answerer.getResultCount() == 1);