Jelajahi Sumber

HDFS-5772. Serialize under-construction file information in FSImage. Contributed by Jing Zhao.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/HDFS-5698@1558244 13f79535-47bb-0310-9956-ffa450edef68
Jing Zhao 11 tahun lalu
induk
melakukan
02197c3874

+ 2 - 0
hadoop-hdfs-project/hadoop-hdfs/CHANGES_HDFS-5698.txt

@@ -3,3 +3,5 @@ HDFS-5698 subtasks
     HDFS-5717. Save FSImage header in protobuf. (Haohui Mai via jing9) 
 
     HDFS-5738. Serialize INode information in protobuf. (Haohui Mai via jing9)
+
+    HDFS-5772. Serialize under-construction file information in FSImage. (jing9)

+ 73 - 23
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImageFormatPBINode.java

@@ -22,6 +22,7 @@ import java.io.IOException;
 import java.io.InputStream;
 import java.io.OutputStream;
 import java.util.List;
+import java.util.Map;
 
 import org.apache.hadoop.HadoopIllegalArgumentException;
 import org.apache.hadoop.fs.permission.FsPermission;
@@ -30,12 +31,15 @@ import org.apache.hadoop.hdfs.protocol.Block;
 import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.BlockProto;
 import org.apache.hadoop.hdfs.protocolPB.PBHelper;
 import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfo;
+import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfoUnderConstruction;
 import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager;
 import org.apache.hadoop.hdfs.server.namenode.FsImageProto.FileSummary;
+import org.apache.hadoop.hdfs.server.namenode.FsImageProto.FilesUnderConstructionSection.FileUnderConstructionEntry;
 import org.apache.hadoop.hdfs.server.namenode.FsImageProto.INodeDirectorySection;
 import org.apache.hadoop.hdfs.server.namenode.FsImageProto.INodeSection;
 import org.apache.hadoop.hdfs.server.namenode.FsImageProto.INodeSection.Permission;
 import org.apache.hadoop.hdfs.server.namenode.snapshot.Snapshot;
+import org.apache.hadoop.hdfs.util.ReadOnlyList;
 
 import com.google.common.base.Preconditions;
 import com.google.protobuf.ByteString;
@@ -56,14 +60,13 @@ final class FSImageFormatPBINode {
     }
 
     void loadINodeDirectorySection(InputStream in) throws IOException {
-      final INodeMap inodeMap = dir.getINodeMap();
       while (true) {
         INodeDirectorySection.DirEntry e = INodeDirectorySection.DirEntry
             .parseDelimitedFrom(in);
-
-        if (e == null)
+        // note that in is a LimitedInputStream
+        if (e == null) {
           break;
-
+        }
         INodeDirectory p = dir.getInode(e.getParent()).asDirectory();
         for (long id : e.getChildrenList()) {
           INode child = dir.getInode(id);
@@ -86,6 +89,24 @@ final class FSImageFormatPBINode {
       }
     }
 
+    /**
+     * Load the under-construction files section, and update the lease map
+     */
+    void loadFilesUnderConstructionSection(InputStream in) throws IOException {
+      while (true) {
+        FileUnderConstructionEntry entry = FileUnderConstructionEntry
+            .parseDelimitedFrom(in);
+        if (entry == null) {
+          break;
+        }
+        // update the lease manager
+        INodeFile file = fsn.dir.getInode(entry.getInodeId()).asFile();
+        FileUnderConstructionFeature uc = file.getFileUnderConstructionFeature();
+        Preconditions.checkState(uc != null); // file must be under-construction
+        fsn.leaseManager.addLease(uc.getClientName(), entry.getFullPath());
+      }
+    }
+
     private void addToParent(INodeDirectory parent, INode child) {
       FSDirectory fsDir = fsn.dir;
       if (parent == fsDir.rootDir && FSDirectory.isReservedName(child)) {
@@ -142,11 +163,23 @@ final class FSImageFormatPBINode {
       for (int i = 0, e = bp.size(); i < e; ++i) {
         blocks[i] = new BlockInfo(PBHelper.convert(bp.get(i)), replication);
       }
-
       final PermissionStatus permissions = loadPermission(f.getPermission());
+
       final INodeFile file = new INodeFile(n.getId(),
           n.getName().toByteArray(), permissions, f.getModificationTime(),
           f.getAccessTime(), blocks, replication, f.getPreferredBlockSize());
+      // under-construction information
+      if (f.hasFileUC()) {
+        INodeSection.FileUnderConstructionFeature uc = f.getFileUC();
+        file.toUnderConstruction(uc.getClientName(), uc.getClientMachine(),
+            null);
+        if (blocks.length > 0) {
+          BlockInfo lastBlk = file.getLastBlock();
+          // replace the last block of file
+          file.setBlock(file.numBlocks() - 1, new BlockInfoUnderConstruction(
+              lastBlk, replication));
+        }
+      }
       return file;
     }
 
@@ -193,15 +226,14 @@ final class FSImageFormatPBINode {
         if (!n.isDirectory())
           continue;
 
-        INodeDirectory d = n.asDirectory();
-
-        INodeDirectorySection.DirEntry.Builder b = INodeDirectorySection.DirEntry
-            .newBuilder().setParent(n.getId());
-
-        for (INode inode : d.getChildrenList(Snapshot.CURRENT_STATE_ID))
-          b.addChildren(inode.getId());
-
-        if (b.getChildrenCount() != 0) {
+        ReadOnlyList<INode> children = n.asDirectory().getChildrenList(
+            Snapshot.CURRENT_STATE_ID);
+        if (children.size() > 0) {
+          INodeDirectorySection.DirEntry.Builder b = INodeDirectorySection.
+              DirEntry.newBuilder().setParent(n.getId());
+          for (INode inode : children) {
+            b.addChildren(inode.getId());
+          }
           INodeDirectorySection.DirEntry e = b.build();
           e.writeDelimitedTo(out);
         }
@@ -211,23 +243,32 @@ final class FSImageFormatPBINode {
     }
 
     void serializeINodeSection() throws IOException {
-      final INodeDirectory rootDir = fsn.dir.rootDir;
-      final long numINodes = rootDir.getDirectoryWithQuotaFeature()
-          .getSpaceConsumed().get(Quota.NAMESPACE);
+      INodeMap inodesMap = fsn.dir.getINodeMap();
       INodeSection.Builder b = INodeSection.newBuilder()
-          .setLastInodeId(fsn.getLastInodeId()).setNumInodes(numINodes);
+          .setLastInodeId(fsn.getLastInodeId()).setNumInodes(inodesMap.size());
       INodeSection s = b.build();
       s.writeDelimitedTo(out);
 
-      long i = 0;
-      for (INodeWithAdditionalFields n : fsn.dir.getINodeMap().getMap()) {
+      for (INodeWithAdditionalFields n : inodesMap.getMap()) {
         save(n);
-        ++i;
       }
-      Preconditions.checkState(numINodes == i);
       parent.commitSection(headers, FSImageFormatProtobuf.SectionName.INODE);
     }
 
+    void serializeFilesUCSection() throws IOException {
+      Map<String, INodeFile> ucMap = fsn.getFilesUnderConstruction();
+      for (Map.Entry<String, INodeFile> entry : ucMap.entrySet()) {
+        String path = entry.getKey();
+        INodeFile file = entry.getValue();
+        FileUnderConstructionEntry.Builder b = FileUnderConstructionEntry
+            .newBuilder().setInodeId(file.getId()).setFullPath(path);
+        FileUnderConstructionEntry e = b.build();
+        e.writeDelimitedTo(out);
+      }
+      parent.commitSection(headers,
+          FSImageFormatProtobuf.SectionName.FILES_UNDERCONSTRUCTION);
+    }
+
     private INodeSection.Permission.Builder buildPermissionStatus(INode n) {
       return INodeSection.Permission.newBuilder().setUser(n.getUserName())
           .setGroup(n.getGroupName()).setPermission(n.getFsPermissionShort());
@@ -263,8 +304,17 @@ final class FSImageFormatPBINode {
           .setPreferredBlockSize(n.getPreferredBlockSize())
           .setReplication(n.getFileReplication());
 
-      for (Block block : n.getBlocks())
+      for (Block block : n.getBlocks()) {
         b.addBlocks(PBHelper.convert(block));
+      }
+
+      FileUnderConstructionFeature uc = n.getFileUnderConstructionFeature();
+      if (uc != null) {
+        INodeSection.FileUnderConstructionFeature f = INodeSection.FileUnderConstructionFeature
+            .newBuilder().setClientName(uc.getClientName())
+            .setClientMachine(uc.getClientMachine()).build();
+        b.setFileUC(f);
+      }
 
       INodeSection.INode r = INodeSection.INode.newBuilder()
           .setType(INodeSection.INode.Type.FILE).setId(n.getId())

+ 9 - 4
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImageFormatProtobuf.java

@@ -54,10 +54,10 @@ import com.google.protobuf.CodedOutputStream;
  */
 final class FSImageFormatProtobuf {
   private static final Log LOG = LogFactory.getLog(FSImageFormatProtobuf.class);
-  
+
   static final byte[] MAGIC_HEADER = "HDFSIMG1".getBytes();
   private static final int FILE_VERSION = 1;
-  
+
   static final class Loader implements FSImageFormat.AbstractLoader {
     private static final int MINIMUM_FILE_LENGTH = 8;
     private final Configuration conf;
@@ -171,6 +171,9 @@ final class FSImageFormatProtobuf {
         case INODE_DIR:
           inodeLoader.loadINodeDirectorySection(in);
           break;
+        case FILES_UNDERCONSTRUCTION:
+          inodeLoader.loadFilesUnderConstructionSection(in);
+          break;
         default:
           LOG.warn("Unregconized section " + n);
           break;
@@ -244,6 +247,7 @@ final class FSImageFormatProtobuf {
           out, summary);
       saver.serializeINodeSection();
       saver.serializeINodeDirectorySection();
+      saver.serializeFilesUCSection();
     }
 
     private void saveInternal(FileOutputStream fout,
@@ -306,7 +310,8 @@ final class FSImageFormatProtobuf {
    * Supported section name
    */
   enum SectionName {
-    INODE("INODE"), INODE_DIR("INODE_DIR"), NS_INFO("NS_INFO");
+    INODE("INODE"), INODE_DIR("INODE_DIR"), NS_INFO("NS_INFO"),
+    FILES_UNDERCONSTRUCTION("FILES_UNDERCONSTRUCTION");
 
     private static final SectionName[] values = SectionName.values();
 
@@ -324,7 +329,7 @@ final class FSImageFormatProtobuf {
       this.name = name;
     }
   }
-  
+
   private static int getOndiskTrunkSize(com.google.protobuf.GeneratedMessage s) {
     return CodedOutputStream.computeRawVarint32Size(s.getSerializedSize())
         + s.getSerializedSize();

+ 9 - 0
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java

@@ -5979,6 +5979,15 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
     }
   }
 
+  /**
+   * @return all the under-construction files in the lease map
+   */
+  Map<String, INodeFile> getFilesUnderConstruction() {
+    synchronized (leaseManager) {
+      return leaseManager.getINodesUnderConstruction();
+    }
+  }
+
   /**
    * Register a Backup name-node, verifying that it belongs
    * to the correct namespace, and adding it to the set of

+ 31 - 10
hadoop-hdfs-project/hadoop-hdfs/src/main/proto/fsimage.proto

@@ -42,7 +42,7 @@ import "hdfs.proto";
  * that there always will be an integer indicates the size of the
  * protobuf message.
  *
- **/
+ */
 
 message FileSummary {
   // The version of the above EBNF grammars.
@@ -51,6 +51,7 @@ message FileSummary {
   // FSImage.
   required uint32 layoutVersion = 2;
   optional string codec         = 3;
+  // index for each section
   message Section {
     optional string name = 1;
     optional uint64 length = 2;
@@ -61,7 +62,7 @@ message FileSummary {
 
 /**
  * Name: NS_INFO
- **/
+ */
 message NameSystemSection {
   optional uint32 namespaceId = 1;
   optional uint64 genstampV1 = 2;
@@ -73,7 +74,7 @@ message NameSystemSection {
 
 /**
  * Name: INODE
- **/
+ */
 message INodeSection {
   message Permission {
     optional string user = 1;
@@ -81,6 +82,14 @@ message INodeSection {
     optional uint32 permission = 3;
   }
 
+  /**
+   * under-construction feature for INodeFile
+   */
+  message FileUnderConstructionFeature {
+    optional string clientName = 1;
+    optional string clientMachine = 2;
+  }
+
   message INodeFile {
     optional uint32 replication = 1;
     optional uint64 modificationTime = 2;
@@ -88,6 +97,7 @@ message INodeSection {
     optional uint64 preferredBlockSize = 4;
     optional Permission permission = 5;
     repeated BlockProto blocks = 6;
+    optional FileUnderConstructionFeature fileUC = 7;
   }
 
   message INodeDirectory {
@@ -108,12 +118,10 @@ message INodeSection {
     };
     required Type type = 1;
     required uint64 id = 2;
-
-    optional INodeFile file = 3;
-    optional INodeDirectory directory = 4;
-
-    optional bytes name = 16;
-    repeated bytes features = 17;
+    optional bytes name = 3;
+    
+    optional INodeFile file = 4;
+    optional INodeDirectory directory = 5;
   }
 
   optional uint64 lastInodeId = 1;
@@ -121,10 +129,23 @@ message INodeSection {
   // repeated INodes..
 }
 
+/**
+ * This section records information about under-construction files for
+ * reconstructing the lease map.
+ * NAME: FILES_UNDERCONSTRUCTION
+ */
+message FilesUnderConstructionSection {
+  message FileUnderConstructionEntry {
+    optional uint64 inodeId = 1;
+    optional string fullPath = 2;
+  }
+  // repeated FileUnderConstructionEntry...
+}
+
 /**
  * This section records the children of each directories
  * NAME: INODE_DIR
- **/
+ */
 message INodeDirectorySection {
   message DirEntry {
     optional uint64 parent = 1;

+ 66 - 10
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestFSImage.java

@@ -17,33 +17,89 @@
  */
 package org.apache.hadoop.hdfs.server.namenode;
 
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
 import java.io.IOException;
+import java.util.EnumSet;
+
+import junit.framework.Assert;
 
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.DFSOutputStream;
 import org.apache.hadoop.hdfs.DistributedFileSystem;
 import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.hdfs.client.HdfsDataOutputStream.SyncFlag;
 import org.apache.hadoop.hdfs.protocol.HdfsConstants.SafeModeAction;
-import org.junit.Assert;
+import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfo;
+import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.BlockUCState;
+import org.apache.hadoop.hdfs.server.namenode.LeaseManager.Lease;
+import org.junit.After;
+import org.junit.Before;
 import org.junit.Test;
 
 public class TestFSImage {
+
+  private MiniDFSCluster cluster;
+  private Configuration conf;
+  private DistributedFileSystem fs;
+  private FSNamesystem fsn;
+
+  @Before
+  public void setUp() throws IOException {
+    conf = new Configuration();
+    cluster = new MiniDFSCluster.Builder(conf).build();
+    cluster.waitActive();
+    fsn = cluster.getNamesystem();
+    fs = cluster.getFileSystem();
+  }
+
+  @After
+  public void tearDown() throws IOException {
+    if (cluster != null) {
+      cluster.shutdown();
+    }
+  }
+
   @Test
   public void testINode() throws IOException {
-    Configuration conf = new Configuration();
-    MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).build();
-    cluster.waitActive();
-    DistributedFileSystem fs = cluster.getFileSystem();
-    fs.mkdirs(new Path("/abc/def"));
-    fs.create(new Path("/abc/def/e")).close();
+    final Path dir = new Path("/abc/def");
+    final Path file1 = new Path(dir, "f1");
+    final Path file2 = new Path(dir, "f2");
+
+    // create an empty file f1
+    fs.create(file1).close();
+
+    // create an under-construction file f2
+    FSDataOutputStream out = fs.create(file2);
+    out.writeBytes("hello");
+    ((DFSOutputStream) out.getWrappedStream()).hsync(EnumSet
+        .of(SyncFlag.UPDATE_LENGTH));
+
+    // checkpoint
     fs.setSafeMode(SafeModeAction.SAFEMODE_ENTER);
     fs.saveNamespace();
     fs.setSafeMode(SafeModeAction.SAFEMODE_LEAVE);
+
     cluster.restartNameNode();
     cluster.waitActive();
     fs = cluster.getFileSystem();
-    Assert.assertTrue(fs.exists(new Path("/abc/def/e")));
-    Assert.assertTrue(fs.isDirectory(new Path("/abc/def")));
-    cluster.shutdown();
+
+    assertTrue(fs.isDirectory(dir));
+    assertTrue(fs.exists(file1));
+    assertTrue(fs.exists(file2));
+
+    // check internals of file2
+    INodeFile file2Node = fsn.dir.getINode4Write(file2.toString()).asFile();
+    assertEquals("hello".length(), file2Node.computeFileSize());
+    assertTrue(file2Node.isUnderConstruction());
+    BlockInfo[] blks = file2Node.getBlocks();
+    assertEquals(1, blks.length);
+    assertEquals(BlockUCState.UNDER_CONSTRUCTION, blks[0].getBlockUCState());
+    // check lease manager
+    Lease lease = fsn.leaseManager.getLeaseByPath(file2.toString());
+    Assert.assertNotNull(lease);
   }
 }