فهرست منبع

Merging changes r1039957:r1040005 from trunk to federation. Federation branch does not compile until changes from HDFS-1533 is integrated.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/hdfs/branches/HDFS-1052@1078923 13f79535-47bb-0310-9956-ffa450edef68
Suresh Srinivas 14 سال پیش
والد
کامیت
8c01a6ac23

+ 3 - 0
CHANGES.txt

@@ -474,6 +474,9 @@ Release 0.22.0 - Unreleased
 
     HDFS-1513. Fix a number of warnings. (eli)
 
+    HDFS-1473. Refactor storage management into separate classes than fsimage
+    file reading/writing. (Todd Lipcon via eli)
+
   OPTIMIZATIONS
 
     HDFS-1140. Speedup INode.getPathComponents. (Dmytro Molkov via shv)

+ 25 - 26
src/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogLoader.java

@@ -148,7 +148,7 @@ public class FSEditLogLoader {
                                     " but writables.length is " +
                                     length + ". ");
           }
-          path = FSImage.readString(in);
+          path = FSImageSerialization.readString(in);
           short replication = fsNamesys.adjustReplication(readShort(in));
           mtime = readLong(in);
           if (logVersion <= -17) {
@@ -182,8 +182,8 @@ public class FSEditLogLoader {
 
           // clientname, clientMachine and block locations of last block.
           if (opcode == Ops.OP_ADD && logVersion <= -12) {
-            clientName = FSImage.readString(in);
-            clientMachine = FSImage.readString(in);
+            clientName = FSImageSerialization.readString(in);
+            clientMachine = FSImageSerialization.readString(in);
             if (-13 <= logVersion) {
               readDatanodeDescriptorArray(in);
             }
@@ -231,7 +231,7 @@ public class FSEditLogLoader {
         } 
         case Ops.OP_SET_REPLICATION: {
           numOpSetRepl++;
-          path = FSImage.readString(in);
+          path = FSImageSerialization.readString(in);
           short replication = fsNamesys.adjustReplication(readShort(in));
           fsDir.unprotectedSetReplication(path, replication, null);
           break;
@@ -247,11 +247,11 @@ public class FSEditLogLoader {
             throw new IOException("Incorrect data format. " 
                                   + "Mkdir operation.");
           }
-          String trg = FSImage.readString(in);
+          String trg = FSImageSerialization.readString(in);
           int srcSize = length - 1 - 1; //trg and timestamp
           String [] srcs = new String [srcSize];
           for(int i=0; i<srcSize;i++) {
-            srcs[i]= FSImage.readString(in);
+            srcs[i]= FSImageSerialization.readString(in);
           }
           timestamp = readLong(in);
           fsDir.unprotectedConcat(trg, srcs);
@@ -264,8 +264,8 @@ public class FSEditLogLoader {
             throw new IOException("Incorrect data format. " 
                                   + "Mkdir operation.");
           }
-          String s = FSImage.readString(in);
-          String d = FSImage.readString(in);
+          String s = FSImageSerialization.readString(in);
+          String d = FSImageSerialization.readString(in);
           timestamp = readLong(in);
           HdfsFileStatus dinfo = fsDir.getFileInfo(d, false);
           fsDir.unprotectedRenameTo(s, d, timestamp);
@@ -279,7 +279,7 @@ public class FSEditLogLoader {
             throw new IOException("Incorrect data format. " 
                                   + "delete operation.");
           }
-          path = FSImage.readString(in);
+          path = FSImageSerialization.readString(in);
           timestamp = readLong(in);
           fsDir.unprotectedDelete(path, timestamp);
           break;
@@ -293,7 +293,7 @@ public class FSEditLogLoader {
             throw new IOException("Incorrect data format. " 
                                   + "Mkdir operation.");
           }
-          path = FSImage.readString(in);
+          path = FSImageSerialization.readString(in);
           timestamp = readLong(in);
 
           // The disk format stores atimes for directories as well.
@@ -317,9 +317,8 @@ public class FSEditLogLoader {
         } 
         case Ops.OP_DATANODE_ADD: {
           numOpOther++;
-          FSImage.DatanodeImage nodeimage = new FSImage.DatanodeImage();
-          nodeimage.readFields(in);
           //Datanodes are not persistent any more.
+          FSImageSerialization.DatanodeImage.skipOne(in);
           break;
         }
         case Ops.OP_DATANODE_REMOVE: {
@@ -335,7 +334,7 @@ public class FSEditLogLoader {
             throw new IOException("Unexpected opcode " + opcode
                                   + " for version " + logVersion);
           fsDir.unprotectedSetPermission(
-              FSImage.readString(in), FsPermission.read(in));
+              FSImageSerialization.readString(in), FsPermission.read(in));
           break;
         }
         case Ops.OP_SET_OWNER: {
@@ -343,9 +342,9 @@ public class FSEditLogLoader {
           if (logVersion > -11)
             throw new IOException("Unexpected opcode " + opcode
                                   + " for version " + logVersion);
-          fsDir.unprotectedSetOwner(FSImage.readString(in),
-              FSImage.readString_EmptyAsNull(in),
-              FSImage.readString_EmptyAsNull(in));
+          fsDir.unprotectedSetOwner(FSImageSerialization.readString(in),
+              FSImageSerialization.readString_EmptyAsNull(in),
+              FSImageSerialization.readString_EmptyAsNull(in));
           break;
         }
         case Ops.OP_SET_NS_QUOTA: {
@@ -353,7 +352,7 @@ public class FSEditLogLoader {
             throw new IOException("Unexpected opcode " + opcode
                 + " for version " + logVersion);
           }
-          fsDir.unprotectedSetQuota(FSImage.readString(in), 
+          fsDir.unprotectedSetQuota(FSImageSerialization.readString(in), 
                                     readLongWritable(in), 
                                     FSConstants.QUOTA_DONT_SET);
           break;
@@ -363,14 +362,14 @@ public class FSEditLogLoader {
             throw new IOException("Unexpected opcode " + opcode
                 + " for version " + logVersion);
           }
-          fsDir.unprotectedSetQuota(FSImage.readString(in),
+          fsDir.unprotectedSetQuota(FSImageSerialization.readString(in),
                                     FSConstants.QUOTA_RESET,
                                     FSConstants.QUOTA_DONT_SET);
           break;
         }
 
         case Ops.OP_SET_QUOTA:
-          fsDir.unprotectedSetQuota(FSImage.readString(in),
+          fsDir.unprotectedSetQuota(FSImageSerialization.readString(in),
                                     readLongWritable(in),
                                     readLongWritable(in));
                                       
@@ -383,7 +382,7 @@ public class FSEditLogLoader {
             throw new IOException("Incorrect data format. " 
                                   + "times operation.");
           }
-          path = FSImage.readString(in);
+          path = FSImageSerialization.readString(in);
           mtime = readLong(in);
           atime = readLong(in);
           fsDir.unprotectedSetTimes(path, mtime, atime, true);
@@ -396,8 +395,8 @@ public class FSEditLogLoader {
             throw new IOException("Incorrect data format. " 
                                   + "symlink operation.");
           }
-          path = FSImage.readString(in);
-          String value = FSImage.readString(in);
+          path = FSImageSerialization.readString(in);
+          String value = FSImageSerialization.readString(in);
           mtime = readLong(in);
           atime = readLong(in);
           PermissionStatus perm = PermissionStatus.read(in);
@@ -415,8 +414,8 @@ public class FSEditLogLoader {
             throw new IOException("Incorrect data format. " 
                                   + "Mkdir operation.");
           }
-          String s = FSImage.readString(in);
-          String d = FSImage.readString(in);
+          String s = FSImageSerialization.readString(in);
+          String d = FSImageSerialization.readString(in);
           timestamp = readLong(in);
           Rename[] options = readRenameOptions(in);
           HdfsFileStatus dinfo = fsDir.getFileInfo(d, false);
@@ -554,11 +553,11 @@ public class FSEditLogLoader {
   }
 
   static private short readShort(DataInputStream in) throws IOException {
-    return Short.parseShort(FSImage.readString(in));
+    return Short.parseShort(FSImageSerialization.readString(in));
   }
 
   static private long readLong(DataInputStream in) throws IOException {
-    return Long.parseLong(FSImage.readString(in));
+    return Long.parseLong(FSImageSerialization.readString(in));
   }
   
   // a place holder for reading a long

+ 25 - 613
src/java/org/apache/hadoop/hdfs/server/namenode/FSImage.java

@@ -17,11 +17,7 @@
  */
 package org.apache.hadoop.hdfs.server.namenode;
 
-import java.io.BufferedInputStream;
-import java.io.BufferedOutputStream;
-import java.io.DataInput;
 import java.io.DataInputStream;
-import java.io.DataOutput;
 import java.io.DataOutputStream;
 import java.io.File;
 import java.io.FileInputStream;
@@ -30,15 +26,10 @@ import java.io.IOException;
 import java.io.RandomAccessFile;
 import java.net.URI;
 import java.net.UnknownHostException;
-import java.nio.ByteBuffer;
 import java.security.NoSuchAlgorithmException;
 import java.security.SecureRandom;
-import java.security.DigestInputStream;
-import java.security.DigestOutputStream;
-import java.security.MessageDigest;
 import java.text.SimpleDateFormat;
 import java.util.ArrayList;
-import java.util.Arrays;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.Date;
@@ -55,22 +46,13 @@ import java.util.UUID;
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceStability;
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.fs.permission.FsPermission;
-import org.apache.hadoop.fs.permission.PermissionStatus;
-import org.apache.hadoop.hdfs.DFSUtil;
-import org.apache.hadoop.hdfs.DeprecatedUTF8;
-import org.apache.hadoop.hdfs.protocol.Block;
-import org.apache.hadoop.hdfs.protocol.DatanodeID;
 import org.apache.hadoop.hdfs.protocol.FSConstants;
-import org.apache.hadoop.hdfs.server.common.GenerationStamp;
 import org.apache.hadoop.hdfs.server.common.InconsistentFSStateException;
 import org.apache.hadoop.hdfs.server.common.Storage;
 import org.apache.hadoop.hdfs.server.common.StorageInfo;
 import org.apache.hadoop.hdfs.server.common.UpgradeManager;
 import org.apache.hadoop.hdfs.server.common.Util;
 import static org.apache.hadoop.hdfs.server.common.Util.now;
-import org.apache.hadoop.hdfs.server.common.HdfsConstants.BlockUCState;
 import org.apache.hadoop.hdfs.server.common.HdfsConstants.NamenodeRole;
 import org.apache.hadoop.hdfs.server.common.HdfsConstants.NodeType;
 import org.apache.hadoop.hdfs.server.common.HdfsConstants.StartupOption;
@@ -80,11 +62,7 @@ import org.apache.hadoop.hdfs.server.protocol.NamenodeCommand;
 import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocol;
 import org.apache.hadoop.hdfs.server.protocol.NamenodeRegistration;
 import org.apache.hadoop.io.MD5Hash;
-import org.apache.hadoop.io.Text;
-import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.net.DNS;
-import org.apache.hadoop.io.compress.CompressionCodec;
-import org.apache.hadoop.io.compress.CompressionCodecFactory;
 import org.apache.hadoop.hdfs.DFSConfigKeys;
 
 /**
@@ -165,32 +143,13 @@ public class FSImage extends Storage {
   private Collection<URI> checkpointDirs;
   private Collection<URI> checkpointEditsDirs;
 
-  /**
-   * Image compression related fields
-   */
-  private boolean compressImage = false;  // if image should be compressed
-  private CompressionCodec saveCodec;     // the compression codec
-  private CompressionCodecFactory codecFac;  // all the supported codecs
+  private Configuration conf;
 
   /**
    * Can fs-image be rolled?
    */
   volatile protected CheckpointStates ckptState = FSImage.CheckpointStates.START; 
 
-  /**
-   * Used for saving the image to disk
-   */
-  static private final ThreadLocal<FsPermission> FILE_PERM =
-                          new ThreadLocal<FsPermission>() {
-                            @Override
-                            protected FsPermission initialValue() {
-                              return new FsPermission((short) 0);
-                            }
-                          };
-  static private final byte[] PATH_SEPARATOR = DFSUtil.string2Bytes(Path.SEPARATOR);
-
-  private static final Random R = new Random();
-  
   /**
    */
   FSImage() {
@@ -203,6 +162,8 @@ public class FSImage extends Storage {
    */
   FSImage(Configuration conf) throws IOException {
     this();
+    this.conf = conf; // TODO we have too many constructors, this is a mess
+
     if(conf.getBoolean(DFSConfigKeys.DFS_NAMENODE_NAME_DIR_RESTORE_KEY, 
         DFSConfigKeys.DFS_NAMENODE_NAME_DIR_RESTORE_DEFAULT)) {
       NameNode.LOG.info("set FSImage.restoreFailedStorage");
@@ -210,23 +171,11 @@ public class FSImage extends Storage {
     }
     setCheckpointDirectories(FSImage.getCheckpointDirs(conf, null),
         FSImage.getCheckpointEditsDirs(conf, null));
-    this.compressImage = conf.getBoolean(
-        DFSConfigKeys.DFS_IMAGE_COMPRESS_KEY,
-        DFSConfigKeys.DFS_IMAGE_COMPRESS_DEFAULT);
-     this.codecFac = new CompressionCodecFactory(conf);
-     if (this.compressImage) {
-       String codecClassName = conf.get(
-           DFSConfigKeys.DFS_IMAGE_COMPRESSION_CODEC_KEY,
-           DFSConfigKeys.DFS_IMAGE_COMPRESSION_CODEC_DEFAULT);
-       this.saveCodec = codecFac.getCodecByClassName(codecClassName);
-       if (this.saveCodec == null) {
-         throw new IOException("Not supported codec: " + codecClassName);
-       }
-     }
-   }
- 
-  FSImage(FSNamesystem ns) {
+  }
+
+  private FSImage(FSNamesystem ns) {
     super(NodeType.NAME_NODE);
+    this.conf = new Configuration();
     this.editLog = new FSEditLog(this);
     setFSNamesystem(ns);
   }
@@ -713,7 +662,6 @@ public class FSImage extends Storage {
     // replace real image with the checkpoint image
     FSImage realImage = fsNamesys.getFSImage();
     assert realImage == this;
-    ckptImage.codecFac = realImage.codecFac;
     fsNamesys.dir.fsImage = ckptImage;
     // load from the checkpoint dirs
     try {
@@ -1158,234 +1106,30 @@ public class FSImage extends Storage {
    * "re-save" and consolidate the edit-logs
    */
   boolean loadFSImage(File curFile) throws IOException {
-    assert this.getLayoutVersion() < 0 : "Negative layout version is expected.";
-    assert curFile != null : "curFile is null";
-
-    long startTime = now();   
-    FSNamesystem fsNamesys = getFSNamesystem();
-    FSDirectory fsDir = fsNamesys.dir;
-    fsNamesys.setBlockPoolId(this.getBlockPoolID());
-
-    //
-    // Load in bits
-    //
-    boolean needToSave = true;
-    MessageDigest digester = MD5Hash.getDigester();
-    DigestInputStream fin = new DigestInputStream(
-         new FileInputStream(curFile), digester);
-
-    DataInputStream in = new DataInputStream(fin);
-    try {
-      /*
-       * Note: Remove any checks for version earlier than 
-       * Storage.LAST_UPGRADABLE_LAYOUT_VERSION since we should never get 
-       * to here with older images.
-       */
-      
-      /*
-       * TODO we need to change format of the image file
-       * it should not contain version and namespace fields
-       */
-      // read image version: first appeared in version -1
-      int imgVersion = in.readInt();
-      needToSave = (imgVersion != FSConstants.LAYOUT_VERSION);
-
-      // read namespaceID: first appeared in version -2
-      this.namespaceID = in.readInt();
-
-      // read number of files
-      long numFiles;
-      if (imgVersion <= -16) {
-        numFiles = in.readLong();
-      } else {
-        numFiles = in.readInt();
-      }
-
-      this.layoutVersion = imgVersion;
-      // read in the last generation stamp.
-      if (imgVersion <= -12) {
-        long genstamp = in.readLong();
-        fsNamesys.setGenerationStamp(genstamp); 
-      }
-
-      // read compression related info
-      boolean isCompressed = false;
-      if (imgVersion <= -25) {  // -25: 1st version providing compression option
-        isCompressed = in.readBoolean();
-        if (isCompressed) {
-          String codecClassName = Text.readString(in);
-          CompressionCodec loadCodec = codecFac.getCodecByClassName(codecClassName);
-          if (loadCodec == null) {
-            throw new IOException("Image compression codec not supported: "
-                                 + codecClassName);
-          }
-          in = new DataInputStream(loadCodec.createInputStream(fin));
-          LOG.info("Loading image file " + curFile +
-              " compressed using codec " + codecClassName);
-        }
-      }
-      if (!isCompressed) {
-        // use buffered input stream
-        in = new DataInputStream(new BufferedInputStream(fin));
-      }
-      
-      // read file info
-      short replication = fsNamesys.getDefaultReplication();
-
-      LOG.info("Number of files = " + numFiles);
-
-      byte[][] pathComponents;
-      byte[][] parentPath = {{}};
-      INodeDirectory parentINode = fsDir.rootDir;
-      for (long i = 0; i < numFiles; i++) {
-        long modificationTime = 0;
-        long atime = 0;
-        long blockSize = 0;
-        pathComponents = readPathComponents(in);
-        replication = in.readShort();
-        replication = fsNamesys.adjustReplication(replication);
-        modificationTime = in.readLong();
-        if (imgVersion <= -17) {
-          atime = in.readLong();
-        }
-        if (imgVersion <= -8) {
-          blockSize = in.readLong();
-        }
-        int numBlocks = in.readInt();
-        Block blocks[] = null;
-
-        // for older versions, a blocklist of size 0
-        // indicates a directory.
-        if ((-9 <= imgVersion && numBlocks > 0) ||
-            (imgVersion < -9 && numBlocks >= 0)) {
-          blocks = new Block[numBlocks];
-          for (int j = 0; j < numBlocks; j++) {
-            blocks[j] = new Block();
-            if (-14 < imgVersion) {
-              blocks[j].set(in.readLong(), in.readLong(), 
-                            GenerationStamp.GRANDFATHER_GENERATION_STAMP);
-            } else {
-              blocks[j].readFields(in);
-            }
-          }
-        }
-        // Older versions of HDFS does not store the block size in inode.
-        // If the file has more than one block, use the size of the 
-        // first block as the blocksize. Otherwise use the default block size.
-        //
-        if (-8 <= imgVersion && blockSize == 0) {
-          if (numBlocks > 1) {
-            blockSize = blocks[0].getNumBytes();
-          } else {
-            long first = ((numBlocks == 1) ? blocks[0].getNumBytes(): 0);
-            blockSize = Math.max(fsNamesys.getDefaultBlockSize(), first);
-          }
-        }
-        
-        // get quota only when the node is a directory
-        long nsQuota = -1L;
-        if (imgVersion <= -16 && blocks == null  && numBlocks == -1) {
-          nsQuota = in.readLong();
-        }
-        long dsQuota = -1L;
-        if (imgVersion <= -18 && blocks == null && numBlocks == -1) {
-          dsQuota = in.readLong();
-        }
+    FSImageFormat.Loader loader = new FSImageFormat.Loader(conf);
+    loader.load(curFile, getFSNamesystem());
 
-        // Read the symlink only when the node is a symlink
-        String symlink = "";
-        if (imgVersion <= -23 && numBlocks == -2) {
-          symlink = Text.readString(in);
-        }
-        
-        PermissionStatus permissions = fsNamesys.getUpgradePermission();
-        if (imgVersion <= -11) {
-          permissions = PermissionStatus.read(in);
-        }
-        
-        if (isRoot(pathComponents)) { // it is the root
-          // update the root's attributes
-          if (nsQuota != -1 || dsQuota != -1) {
-            fsDir.rootDir.setQuota(nsQuota, dsQuota);
-          }
-          fsDir.rootDir.setModificationTime(modificationTime);
-          fsDir.rootDir.setPermissionStatus(permissions);
-          continue;
-        }
-        // check if the new inode belongs to the same parent
-        if(!isParent(pathComponents, parentPath)) {
-          parentINode = null;
-          parentPath = getParent(pathComponents);
-        }
-        // add new inode
-        // without propagating modification time to parent
-        parentINode = fsDir.addToParent(pathComponents, parentINode, permissions,
-                                        blocks, symlink, replication, modificationTime, 
-                                        atime, nsQuota, dsQuota, blockSize, false);
-      }
-      
-      // load datanode info
-      this.loadDatanodes(imgVersion, in);
-
-      // load Files Under Construction
-      this.loadFilesUnderConstruction(imgVersion, in, fsNamesys);
-      
-      this.loadSecretManagerState(imgVersion, in, fsNamesys);
-      
-    } finally {
-      in.close();
-    }
+    namesystem.setBlockPoolId(this.getBlockPoolID());
 
-    // verify checksum
-    MD5Hash readImageMd5 = new MD5Hash(digester.digest());
+    // Check that the image digest we loaded matches up with what
+    // we expected
+    MD5Hash readImageMd5 = loader.getLoadedImageMd5();
     if (imageDigest == null) {
       imageDigest = readImageMd5; // set this fsimage's checksum
     } else if (!imageDigest.equals(readImageMd5)) {
-      throw new IOException("Image file " + curFile + 
-          "is corrupt with MD5 checksum of " + readImageMd5 +
+      throw new IOException("Image file " + curFile +
+          " is corrupt with MD5 checksum of " + readImageMd5 +
           " but expecting " + imageDigest);
     }
 
-    LOG.info("Image file of size " + curFile.length() + " loaded in " 
-        + (now() - startTime)/1000 + " seconds.");
+    this.namespaceID = loader.getLoadedNamespaceID();
+    this.layoutVersion = loader.getLoadedImageVersion();
 
+    boolean needToSave =
+      loader.getLoadedImageVersion() != FSConstants.LAYOUT_VERSION;
     return needToSave;
   }
 
-  /**
-   * Return string representing the parent of the given path.
-   */
-  String getParent(String path) {
-    return path.substring(0, path.lastIndexOf(Path.SEPARATOR));
-  }
-  
-  byte[][] getParent(byte[][] path) {
-    byte[][] result = new byte[path.length - 1][];
-    for (int i = 0; i < result.length; i++) {
-      result[i] = new byte[path[i].length];
-      System.arraycopy(path[i], 0, result[i], 0, path[i].length);
-    }
-    return result;
-  }
-
-  private boolean isRoot(byte[][] path) {
-    return path.length == 1 &&
-      path[0] == null;    
-  }
-
-  private boolean isParent(byte[][] path, byte[][] parent) {
-    if (path == null || parent == null)
-      return false;
-    if (parent.length == 0 || path.length != parent.length + 1)
-      return false;
-    boolean isParent = true;
-    for (int i = 0; i < parent.length; i++) {
-      isParent = isParent && Arrays.equals(path[i], parent[i]); 
-    }
-    return isParent;
-  }
-
-
   /**
    * Load and merge edits from two edits files
    * 
@@ -1420,57 +1164,10 @@ public class FSImage extends Storage {
    * Save the contents of the FS image to the file.
    */
   void saveFSImage(File newFile) throws IOException {
-    FSNamesystem fsNamesys = getFSNamesystem();
-    FSDirectory fsDir = fsNamesys.dir;
-    long startTime = now();
-    //
-    // Write out data
-    //
-    FileOutputStream fout = new FileOutputStream(newFile);
-    MessageDigest digester = MD5Hash.getDigester();
-    DigestOutputStream fos = new DigestOutputStream(fout, digester);
-
-    DataOutputStream out = new DataOutputStream(fos);
-    try {
-      out.writeInt(FSConstants.LAYOUT_VERSION);
-      out.writeInt(namespaceID);
-      out.writeLong(fsDir.rootDir.numItemsInTree());
-      out.writeLong(fsNamesys.getGenerationStamp());
-      
-      // write compression info
-      out.writeBoolean(compressImage);
-      if (compressImage) {
-        String codecClassName = saveCodec.getClass().getCanonicalName();
-        Text.writeString(out, codecClassName);
-        out = new DataOutputStream(saveCodec.createOutputStream(fos));
-        LOG.info("Saving image file " + newFile +
-            " compressed using codec " + codecClassName);
-      } else {
-        // use a buffered output stream
-        out = new DataOutputStream(new BufferedOutputStream(fos));
-      }
-
-      byte[] byteStore = new byte[4*FSConstants.MAX_PATH_LENGTH];
-      ByteBuffer strbuf = ByteBuffer.wrap(byteStore);
-      // save the root
-      saveINode2Image(strbuf, fsDir.rootDir, out);
-      // save the rest of the nodes
-      saveImage(strbuf, 0, fsDir.rootDir, out);
-      fsNamesys.saveFilesUnderConstruction(out);
-      fsNamesys.saveSecretManagerState(out);
-      strbuf = null;
-
-      out.flush();
-      fout.getChannel().force(true);
-    } finally {
-      out.close();
-    }
-
-    // set md5 of the saved image
-    setImageDigest( new MD5Hash(digester.digest()));
-
-    LOG.info("Image file of size " + newFile.length() + " saved in " 
-        + (now() - startTime)/1000 + " seconds.");
+    FSImageFormat.Writer writer = new FSImageFormat.Writer();
+    FSImageCompression compression = FSImageCompression.createCompression(conf);
+    writer.write(newFile, getFSNamesystem(), compression);
+    setImageDigest(writer.getWrittenDigest());
   }
 
   public void setImageDigest(MD5Hash digest) {
@@ -1723,6 +1420,7 @@ public class FSImage extends Storage {
     try {
       rand = SecureRandom.getInstance("SHA1PRNG").nextInt(Integer.MAX_VALUE);
     } catch (NoSuchAlgorithmException e) {
+      final Random R = new Random();
       LOG.warn("Could not use SecureRandom");
       rand = R.nextInt(Integer.MAX_VALUE);
     }
@@ -1758,204 +1456,6 @@ public class FSImage extends Storage {
     }
   }
 
-  /*
-   * Save one inode's attributes to the image.
-   */
-  private static void saveINode2Image(ByteBuffer name,
-                                      INode node,
-                                      DataOutputStream out) throws IOException {
-    int nameLen = name.position();
-    out.writeShort(nameLen);
-    out.write(name.array(), name.arrayOffset(), nameLen);
-    FsPermission filePerm = FILE_PERM.get();
-    if (node.isDirectory()) {
-      out.writeShort(0);  // replication
-      out.writeLong(node.getModificationTime());
-      out.writeLong(0);   // access time
-      out.writeLong(0);   // preferred block size
-      out.writeInt(-1);   // # of blocks
-      out.writeLong(node.getNsQuota());
-      out.writeLong(node.getDsQuota());
-      filePerm.fromShort(node.getFsPermissionShort());
-      PermissionStatus.write(out, node.getUserName(),
-                             node.getGroupName(),
-                             filePerm);
-    } else if (node.isLink()) {
-      out.writeShort(0);  // replication
-      out.writeLong(0);   // modification time
-      out.writeLong(0);   // access time
-      out.writeLong(0);   // preferred block size
-      out.writeInt(-2);   // # of blocks
-      Text.writeString(out, ((INodeSymlink)node).getLinkValue());
-      filePerm.fromShort(node.getFsPermissionShort());
-      PermissionStatus.write(out, node.getUserName(),
-                             node.getGroupName(),
-                             filePerm);      
-    } else {
-      INodeFile fileINode = (INodeFile)node;
-      out.writeShort(fileINode.getReplication());
-      out.writeLong(fileINode.getModificationTime());
-      out.writeLong(fileINode.getAccessTime());
-      out.writeLong(fileINode.getPreferredBlockSize());
-      Block[] blocks = fileINode.getBlocks();
-      out.writeInt(blocks.length);
-      for (Block blk : blocks)
-        blk.write(out);
-      filePerm.fromShort(fileINode.getFsPermissionShort());
-      PermissionStatus.write(out, fileINode.getUserName(),
-                             fileINode.getGroupName(),
-                             filePerm);
-    }
-  }
-  
-  /**
-   * Save file tree image starting from the given root.
-   * This is a recursive procedure, which first saves all children of
-   * a current directory and then moves inside the sub-directories.
-   */
-  private static void saveImage(ByteBuffer parentPrefix,
-                                int prefixLength,
-                                INodeDirectory current,
-                                DataOutputStream out) throws IOException {
-    int newPrefixLength = prefixLength;
-    if (current.getChildrenRaw() == null)
-      return;
-    for(INode child : current.getChildren()) {
-      // print all children first
-      parentPrefix.position(prefixLength);
-      parentPrefix.put(PATH_SEPARATOR).put(child.getLocalNameBytes());
-      saveINode2Image(parentPrefix, child, out);
-    }
-    for(INode child : current.getChildren()) {
-      if(!child.isDirectory())
-        continue;
-      parentPrefix.position(prefixLength);
-      parentPrefix.put(PATH_SEPARATOR).put(child.getLocalNameBytes());
-      newPrefixLength = parentPrefix.position();
-      saveImage(parentPrefix, newPrefixLength, (INodeDirectory)child, out);
-    }
-    parentPrefix.position(prefixLength);
-  }
-
-  void loadDatanodes(int version, DataInputStream in) throws IOException {
-    if (version > -3) // pre datanode image version
-      return;
-    if (version <= -12) {
-      return; // new versions do not store the datanodes any more.
-    }
-    int size = in.readInt();
-    for(int i = 0; i < size; i++) {
-      DatanodeImage nodeImage = new DatanodeImage();
-      nodeImage.readFields(in);
-      // We don't need to add these descriptors any more.
-    }
-  }
-
-  private void loadFilesUnderConstruction(int version, DataInputStream in, 
-      FSNamesystem fs) throws IOException {
-    FSDirectory fsDir = fs.dir;
-    if (version > -13) // pre lease image version
-      return;
-    int size = in.readInt();
-
-    LOG.info("Number of files under construction = " + size);
-
-    for (int i = 0; i < size; i++) {
-      INodeFileUnderConstruction cons = readINodeUnderConstruction(in);
-
-      // verify that file exists in namespace
-      String path = cons.getLocalName();
-      INode old = fsDir.getFileINode(path);
-      if (old == null) {
-        throw new IOException("Found lease for non-existent file " + path);
-      }
-      if (old.isDirectory()) {
-        throw new IOException("Found lease for directory " + path);
-      }
-      INodeFile oldnode = (INodeFile) old;
-      fsDir.replaceNode(path, oldnode, cons);
-      fs.leaseManager.addLease(cons.getClientName(), path); 
-    }
-  }
-
-  private void loadSecretManagerState(int version,  DataInputStream in, 
-      FSNamesystem fs) throws IOException {
-    if (version > -23) {
-      //SecretManagerState is not available.
-      //This must not happen if security is turned on.
-      return; 
-    }
-    fs.loadSecretManagerState(in);
-  }
-  
-  // Helper function that reads in an INodeUnderConstruction
-  // from the input stream
-  //
-  static INodeFileUnderConstruction readINodeUnderConstruction(
-                            DataInputStream in) throws IOException {
-    byte[] name = readBytes(in);
-    short blockReplication = in.readShort();
-    long modificationTime = in.readLong();
-    long preferredBlockSize = in.readLong();
-    int numBlocks = in.readInt();
-    BlockInfo[] blocks = new BlockInfo[numBlocks];
-    Block blk = new Block();
-    int i = 0;
-    for (; i < numBlocks-1; i++) {
-      blk.readFields(in);
-      blocks[i] = new BlockInfo(blk, blockReplication);
-    }
-    // last block is UNDER_CONSTRUCTION
-    if(numBlocks > 0) {
-      blk.readFields(in);
-      blocks[i] = new BlockInfoUnderConstruction(
-        blk, blockReplication, BlockUCState.UNDER_CONSTRUCTION, null);
-    }
-    PermissionStatus perm = PermissionStatus.read(in);
-    String clientName = readString(in);
-    String clientMachine = readString(in);
-
-    // These locations are not used at all
-    int numLocs = in.readInt();
-    DatanodeDescriptor[] locations = new DatanodeDescriptor[numLocs];
-    for (i = 0; i < numLocs; i++) {
-      locations[i] = new DatanodeDescriptor();
-      locations[i].readFields(in);
-    }
-
-    return new INodeFileUnderConstruction(name, 
-                                          blockReplication, 
-                                          modificationTime,
-                                          preferredBlockSize,
-                                          blocks,
-                                          perm,
-                                          clientName,
-                                          clientMachine,
-                                          null);
-  }
-
-  // Helper function that writes an INodeUnderConstruction
-  // into the input stream
-  //
-  static void writeINodeUnderConstruction(DataOutputStream out,
-                                           INodeFileUnderConstruction cons,
-                                           String path) 
-                                           throws IOException {
-    writeString(path, out);
-    out.writeShort(cons.getReplication());
-    out.writeLong(cons.getModificationTime());
-    out.writeLong(cons.getPreferredBlockSize());
-    int nrBlocks = cons.getBlocks().length;
-    out.writeInt(nrBlocks);
-    for (int i = 0; i < nrBlocks; i++) {
-      cons.getBlocks()[i].write(out);
-    }
-    cons.getPermissionStatus().write(out);
-    writeString(cons.getClientName(), out);
-    writeString(cons.getClientMachine(), out);
-
-    out.writeInt(0); //  do not store locations of last block
-  }
 
   /**
    * Moves fsimage.ckpt to fsImage and edits.new to edits
@@ -2300,50 +1800,6 @@ public class FSImage extends Storage {
     return null;
   }
 
-  /**
-   * DatanodeImage is used to store persistent information
-   * about datanodes into the fsImage.
-   */
-  static class DatanodeImage implements Writable {
-    DatanodeDescriptor node = new DatanodeDescriptor();
-
-    /////////////////////////////////////////////////
-    // Writable
-    /////////////////////////////////////////////////
-    /**
-     * Public method that serializes the information about a
-     * Datanode to be stored in the fsImage.
-     */
-    public void write(DataOutput out) throws IOException {
-      new DatanodeID(node).write(out);
-      out.writeLong(node.getCapacity());
-      out.writeLong(node.getRemaining());
-      out.writeLong(node.getLastUpdate());
-      out.writeInt(node.getXceiverCount());
-    }
-
-    /**
-     * Public method that reads a serialized Datanode
-     * from the fsImage.
-     */
-    public void readFields(DataInput in) throws IOException {
-      DatanodeID id = new DatanodeID();
-      id.readFields(in);
-      long capacity = in.readLong();
-      long remaining = in.readLong();
-      long lastUpdate = in.readLong();
-      int xceiverCount = in.readInt();
-
-      // update the DatanodeDescriptor with the data we read in
-      node.updateRegInfo(id);
-      node.setStorageID(id.getStorageID());
-      node.setCapacity(capacity);
-      node.setRemaining(remaining);
-      node.setLastUpdate(lastUpdate);
-      node.setXceiverCount(xceiverCount);
-    }
-  }
-
   private boolean getDistributedUpgradeState() {
     FSNamesystem ns = getFSNamesystem();
     return ns == null ? false : ns.getDistributedUpgradeState();
@@ -2412,51 +1868,7 @@ public class FSImage extends Storage {
       dirNames.add(defaultName);
     }
     return Util.stringCollectionAsURIs(dirNames);
-  }
-
-  static private final DeprecatedUTF8 U_STR = new DeprecatedUTF8();
-  // This should be reverted to package private once the ImageLoader
-  // code is moved into this package. This method should not be called
-  // by other code.
-  public static String readString(DataInputStream in) throws IOException {
-    U_STR.readFields(in);
-    return U_STR.toString();
-  }
-
-  static String readString_EmptyAsNull(DataInputStream in) throws IOException {
-    final String s = readString(in);
-    return s.isEmpty()? null: s;
-  }
-  
-  /**
-   * Reading the path from the image and converting it to byte[][] directly
-   * this saves us an array copy and conversions to and from String
-   * @param in
-   * @return the array each element of which is a byte[] representation 
-   *            of a path component
-   * @throws IOException
-   */
-  public static byte[][] readPathComponents(DataInputStream in)
-      throws IOException {
-      U_STR.readFields(in);
-      return DFSUtil.bytes2byteArray(U_STR.getBytes(),
-        U_STR.getLength(), (byte) Path.SEPARATOR_CHAR);
-    
-  }
-
-  // Same comments apply for this method as for readString()
-  public static byte[] readBytes(DataInputStream in) throws IOException {
-    U_STR.readFields(in);
-    int len = U_STR.getLength();
-    byte[] bytes = new byte[len];
-    System.arraycopy(U_STR.getBytes(), 0, bytes, 0, len);
-    return bytes;
-  }
-
-  static void writeString(String str, DataOutputStream out) throws IOException {
-    U_STR.set(str);
-    U_STR.write(out);
-  }
+  }  
 
   public String getBlockPoolID() {
     return blockpoolID;

+ 178 - 0
src/java/org/apache/hadoop/hdfs/server/namenode/FSImageCompression.java

@@ -0,0 +1,178 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.namenode;
+
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.io.BufferedInputStream;
+import java.io.BufferedOutputStream;
+import java.io.IOException;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hdfs.DFSConfigKeys;
+import org.apache.hadoop.io.compress.CompressionCodec;
+import org.apache.hadoop.io.compress.CompressionCodecFactory;
+
+import org.apache.hadoop.io.Text;
+
+/**
+ * Simple container class that handles support for compressed fsimage files.
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Evolving
+class FSImageCompression {
+
+  /** Codec to use to save or load image, or null if the image is not compressed */
+  private CompressionCodec imageCodec;
+
+  /**
+   * Create a "noop" compression - i.e. uncompressed
+   */
+  private FSImageCompression() {
+  }
+
+  /**
+   * Create compression using a particular codec
+   */
+  private FSImageCompression(CompressionCodec codec) {
+    imageCodec = codec;
+  }
+
+  /**
+   * Create a "noop" compression - i.e. uncompressed
+   */
+  public static FSImageCompression createNoopCompression() {
+    return new FSImageCompression();
+  }
+
+  /**
+   * Create a compression instance based on the user's configuration in the given
+   * Configuration object.
+   * @throws IOException if the specified codec is not available.
+   */
+  public static FSImageCompression createCompression(Configuration conf)
+    throws IOException {
+    boolean compressImage = conf.getBoolean(
+      DFSConfigKeys.DFS_IMAGE_COMPRESS_KEY,
+      DFSConfigKeys.DFS_IMAGE_COMPRESS_DEFAULT);
+
+    if (!compressImage) {
+      return createNoopCompression();
+    }
+
+    String codecClassName = conf.get(
+      DFSConfigKeys.DFS_IMAGE_COMPRESSION_CODEC_KEY,
+      DFSConfigKeys.DFS_IMAGE_COMPRESSION_CODEC_DEFAULT);
+    return createCompression(conf, codecClassName);
+  }
+
+  /**
+   * Create a compression instance using the codec specified by
+   * <code>codecClassName</code>
+   */
+  private static FSImageCompression createCompression(Configuration conf,
+                                                      String codecClassName)
+    throws IOException {
+
+    CompressionCodecFactory factory = new CompressionCodecFactory(conf);
+    CompressionCodec codec = factory.getCodecByClassName(codecClassName);
+    if (codec == null) {
+      throw new IOException("Not a supported codec: " + codecClassName);
+    }
+
+    return new FSImageCompression(codec);
+  }
+
+  /**
+   * Create a compression instance based on a header read from an input stream.
+   * @throws IOException if the specified codec is not available or the
+   * underlying IO fails.
+   */
+  public static FSImageCompression readCompressionHeader(
+    Configuration conf,
+    DataInputStream dis) throws IOException
+  {
+    boolean isCompressed = dis.readBoolean();
+
+    if (!isCompressed) {
+      return createNoopCompression();
+    } else {
+      String codecClassName = Text.readString(dis);
+      return createCompression(conf, codecClassName);
+    }
+  }
+  
+  /**
+   * Unwrap a compressed input stream by wrapping it with a decompressor based
+   * on this codec. If this instance represents no compression, simply adds
+   * buffering to the input stream.
+   * @return a buffered stream that provides uncompressed data
+   * @throws IOException If the decompressor cannot be instantiated or an IO
+   * error occurs.
+   */
+  public DataInputStream unwrapInputStream(InputStream is) throws IOException {
+    if (imageCodec != null) {
+      return new DataInputStream(imageCodec.createInputStream(is));
+    } else {
+      return new DataInputStream(new BufferedInputStream(is));
+    }
+  }
+
+  /**
+   * Write out a header to the given stream that indicates the chosen
+   * compression codec, and return the same stream wrapped with that codec.
+   * If no codec is specified, simply adds buffering to the stream, so that
+   * the returned stream is always buffered.
+   * 
+   * @param os The stream to write header to and wrap. This stream should
+   * be unbuffered.
+   * @return A stream wrapped with the specified compressor, or buffering
+   * if compression is not enabled.
+   * @throws IOException if an IO error occurs or the compressor cannot be
+   * instantiated
+   */
+  public DataOutputStream writeHeaderAndWrapStream(OutputStream os)
+  throws IOException {
+    DataOutputStream dos = new DataOutputStream(os);
+
+    dos.writeBoolean(imageCodec != null);
+
+    if (imageCodec != null) {
+      String codecClassName = imageCodec.getClass().getCanonicalName();
+      Text.writeString(dos, codecClassName);
+
+      return new DataOutputStream(imageCodec.createOutputStream(os));
+    } else {
+      // use a buffered output stream
+      return new DataOutputStream(new BufferedOutputStream(os));
+    }
+  }
+
+  @Override
+  public String toString() {
+    if (imageCodec != null) {
+      return "codec " + imageCodec.getClass().getCanonicalName();
+    } else {
+      return "no compression";
+    }
+  }
+}

+ 498 - 0
src/java/org/apache/hadoop/hdfs/server/namenode/FSImageFormat.java

@@ -0,0 +1,498 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.namenode;
+
+import static org.apache.hadoop.hdfs.server.common.Util.now;
+
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.security.DigestInputStream;
+import java.security.DigestOutputStream;
+import java.security.MessageDigest;
+import java.util.Arrays;
+
+import org.apache.commons.logging.Log;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.permission.PermissionStatus;
+import org.apache.hadoop.hdfs.DFSUtil;
+import org.apache.hadoop.hdfs.protocol.Block;
+import org.apache.hadoop.hdfs.protocol.FSConstants;
+import org.apache.hadoop.hdfs.server.common.GenerationStamp;
+import org.apache.hadoop.io.MD5Hash;
+import org.apache.hadoop.io.Text;
+
+/**
+ * Contains inner classes for reading or writing the on-disk format for FSImages
+ */
+public abstract class FSImageFormat {
+  private static final Log LOG = FSImage.LOG;
+  
+  /**
+   * A one-shot class responsible for loading an image. The load() function
+   * should be called once, after which the getter methods may be used to retrieve
+   * information about the image that was loaded, if loading was successful.
+   */
+  public static class Loader {
+    private final Configuration conf;
+
+    /** Set to true once a file has been loaded using this loader. */
+    private boolean loaded = false;
+
+    /** The image version of the loaded file */
+    private int imgVersion;
+    /** The namespace ID of the loaded file */
+    private int imgNamespaceID;
+    /** The MD5 sum of the loaded file */
+    private MD5Hash imgDigest;
+
+    public Loader(Configuration conf) {
+      this.conf = conf;
+    }
+
+    /**
+     * Return the version number of the image that has been loaded.
+     * @throws IllegalStateException if load() has not yet been called.
+     */
+    int getLoadedImageVersion() {
+      checkLoaded();
+      return imgVersion;
+    }
+    
+    /**
+     * Return the MD5 checksum of the image that has been loaded.
+     * @throws IllegalStateException if load() has not yet been called.
+     */
+    MD5Hash getLoadedImageMd5() {
+      checkLoaded();
+      return imgDigest;
+    }
+
+    /**
+     * Return the namespace ID of the image that has been loaded.
+     * @throws IllegalStateException if load() has not yet been called.
+     */
+    int getLoadedNamespaceID() {
+      checkLoaded();
+      return imgNamespaceID;
+    }
+
+    /**
+     * Throw IllegalStateException if load() has not yet been called.
+     */
+    private void checkLoaded() {
+      if (!loaded) {
+        throw new IllegalStateException("Image not yet loaded!");
+      }
+    }
+
+    /**
+     * Throw IllegalStateException if load() has already been called.
+     */
+    private void checkNotLoaded() {
+      if (loaded) {
+        throw new IllegalStateException("Image already loaded!");
+      }
+    }
+
+    void load(File curFile, FSNamesystem targetNamesystem)
+      throws IOException
+    {
+      checkNotLoaded();
+      assert curFile != null : "curFile is null";
+
+      long startTime = now();
+      FSDirectory fsDir = targetNamesystem.dir;
+
+      //
+      // Load in bits
+      //
+      MessageDigest digester = MD5Hash.getDigester();
+      DigestInputStream fin = new DigestInputStream(
+           new FileInputStream(curFile), digester);
+
+      DataInputStream in = new DataInputStream(fin);
+      try {
+        /*
+         * Note: Remove any checks for version earlier than 
+         * Storage.LAST_UPGRADABLE_LAYOUT_VERSION since we should never get 
+         * to here with older images.
+         */
+
+        /*
+         * TODO we need to change format of the image file
+         * it should not contain version and namespace fields
+         */
+        // read image version: first appeared in version -1
+        imgVersion = in.readInt();
+
+        // read namespaceID: first appeared in version -2
+        imgNamespaceID = in.readInt();
+
+        // read number of files
+        long numFiles = readNumFiles(in);
+
+        // read in the last generation stamp.
+        if (imgVersion <= -12) {
+          long genstamp = in.readLong();
+          targetNamesystem.setGenerationStamp(genstamp); 
+        }
+
+        // read compression related info
+        FSImageCompression compression;
+        if (imgVersion <= -25) {  // -25: 1st version providing compression option
+          compression = FSImageCompression.readCompressionHeader(conf, in);
+        } else {
+          compression = FSImageCompression.createNoopCompression();
+        }
+        in = compression.unwrapInputStream(fin);
+
+        LOG.info("Loading image file " + curFile + " using " + compression);
+
+
+        // read file info
+        short replication = targetNamesystem.getDefaultReplication();
+
+        LOG.info("Number of files = " + numFiles);
+
+        byte[][] pathComponents;
+        byte[][] parentPath = {{}};
+        INodeDirectory parentINode = fsDir.rootDir;
+        for (long i = 0; i < numFiles; i++) {
+          long modificationTime = 0;
+          long atime = 0;
+          long blockSize = 0;
+          pathComponents = FSImageSerialization.readPathComponents(in);
+          replication = in.readShort();
+          replication = targetNamesystem.adjustReplication(replication);
+          modificationTime = in.readLong();
+          if (imgVersion <= -17) {
+            atime = in.readLong();
+          }
+          if (imgVersion <= -8) {
+            blockSize = in.readLong();
+          }
+          int numBlocks = in.readInt();
+          Block blocks[] = null;
+
+          // for older versions, a blocklist of size 0
+          // indicates a directory.
+          if ((-9 <= imgVersion && numBlocks > 0) ||
+              (imgVersion < -9 && numBlocks >= 0)) {
+            blocks = new Block[numBlocks];
+            for (int j = 0; j < numBlocks; j++) {
+              blocks[j] = new Block();
+              if (-14 < imgVersion) {
+                blocks[j].set(in.readLong(), in.readLong(), 
+                              GenerationStamp.GRANDFATHER_GENERATION_STAMP);
+              } else {
+                blocks[j].readFields(in);
+              }
+            }
+          }
+          // Older versions of HDFS does not store the block size in inode.
+          // If the file has more than one block, use the size of the 
+          // first block as the blocksize. Otherwise use the default block size.
+          //
+          if (-8 <= imgVersion && blockSize == 0) {
+            if (numBlocks > 1) {
+              blockSize = blocks[0].getNumBytes();
+            } else {
+              long first = ((numBlocks == 1) ? blocks[0].getNumBytes(): 0);
+              blockSize = Math.max(targetNamesystem.getDefaultBlockSize(), first);
+            }
+          }
+          
+          // get quota only when the node is a directory
+          long nsQuota = -1L;
+          if (imgVersion <= -16 && blocks == null  && numBlocks == -1) {
+            nsQuota = in.readLong();
+          }
+          long dsQuota = -1L;
+          if (imgVersion <= -18 && blocks == null && numBlocks == -1) {
+            dsQuota = in.readLong();
+          }
+
+          // Read the symlink only when the node is a symlink
+          String symlink = "";
+          if (imgVersion <= -23 && numBlocks == -2) {
+            symlink = Text.readString(in);
+          }
+          
+          PermissionStatus permissions = targetNamesystem.getUpgradePermission();
+          if (imgVersion <= -11) {
+            permissions = PermissionStatus.read(in);
+          }
+          
+          if (isRoot(pathComponents)) { // it is the root
+            // update the root's attributes
+            if (nsQuota != -1 || dsQuota != -1) {
+              fsDir.rootDir.setQuota(nsQuota, dsQuota);
+            }
+            fsDir.rootDir.setModificationTime(modificationTime);
+            fsDir.rootDir.setPermissionStatus(permissions);
+            continue;
+          }
+          // check if the new inode belongs to the same parent
+          if(!isParent(pathComponents, parentPath)) {
+            parentINode = null;
+            parentPath = getParent(pathComponents);
+          }
+          // add new inode
+          // without propagating modification time to parent
+          parentINode = fsDir.addToParent(pathComponents, parentINode, permissions,
+                                          blocks, symlink, replication, modificationTime, 
+                                          atime, nsQuota, dsQuota, blockSize, false);
+        }
+
+        // load datanode info
+        this.loadDatanodes(in);
+
+        // load Files Under Construction
+        this.loadFilesUnderConstruction(in, targetNamesystem);
+
+        this.loadSecretManagerState(in, targetNamesystem);
+
+      } finally {
+        in.close();
+      }
+
+      imgDigest = new MD5Hash(digester.digest());
+      loaded = true;
+      
+      LOG.info("Image file of size " + curFile.length() + " loaded in " 
+          + (now() - startTime)/1000 + " seconds.");
+    }
+
+
+    private void loadDatanodes(DataInputStream in) throws IOException {
+      if (imgVersion > -3) // pre datanode image version
+        return;
+      if (imgVersion <= -12) {
+        return; // new versions do not store the datanodes any more.
+      }
+      int size = in.readInt();
+      for(int i = 0; i < size; i++) {
+        // We don't need to add these descriptors any more.
+        FSImageSerialization.DatanodeImage.skipOne(in);
+      }
+    }
+
+    private void loadFilesUnderConstruction(DataInputStream in, 
+        FSNamesystem fs) throws IOException {
+      FSDirectory fsDir = fs.dir;
+      if (imgVersion > -13) // pre lease image version
+        return;
+      int size = in.readInt();
+
+      LOG.info("Number of files under construction = " + size);
+
+      for (int i = 0; i < size; i++) {
+        INodeFileUnderConstruction cons =
+          FSImageSerialization.readINodeUnderConstruction(in);
+
+        // verify that file exists in namespace
+        String path = cons.getLocalName();
+        INode old = fsDir.getFileINode(path);
+        if (old == null) {
+          throw new IOException("Found lease for non-existent file " + path);
+        }
+        if (old.isDirectory()) {
+          throw new IOException("Found lease for directory " + path);
+        }
+        INodeFile oldnode = (INodeFile) old;
+        fsDir.replaceNode(path, oldnode, cons);
+        fs.leaseManager.addLease(cons.getClientName(), path); 
+      }
+    }
+
+    private void loadSecretManagerState(DataInputStream in, 
+        FSNamesystem fs) throws IOException {
+      if (imgVersion > -23) {
+        //SecretManagerState is not available.
+        //This must not happen if security is turned on.
+        return; 
+      }
+      fs.loadSecretManagerState(in);
+    }
+
+
+    private long readNumFiles(DataInputStream in) throws IOException {
+      if (imgVersion <= -16) {
+        return in.readLong();
+      } else {
+        return in.readInt();
+      }
+    }
+
+    private boolean isRoot(byte[][] path) {
+      return path.length == 1 &&
+        path[0] == null;    
+    }
+
+    private boolean isParent(byte[][] path, byte[][] parent) {
+      if (path == null || parent == null)
+        return false;
+      if (parent.length == 0 || path.length != parent.length + 1)
+        return false;
+      boolean isParent = true;
+      for (int i = 0; i < parent.length; i++) {
+        isParent = isParent && Arrays.equals(path[i], parent[i]); 
+      }
+      return isParent;
+    }
+
+    /**
+     * Return string representing the parent of the given path.
+     */
+    String getParent(String path) {
+      return path.substring(0, path.lastIndexOf(Path.SEPARATOR));
+    }
+    
+    byte[][] getParent(byte[][] path) {
+      byte[][] result = new byte[path.length - 1][];
+      for (int i = 0; i < result.length; i++) {
+        result[i] = new byte[path[i].length];
+        System.arraycopy(path[i], 0, result[i], 0, path[i].length);
+      }
+      return result;
+    }
+  }
+  
+  /**
+   * A one-shot class responsible for writing an image file.
+   * The write() function should be called once, after which the getter
+   * functions may be used to retrieve information about the file that was written.
+   */
+  static class Writer {
+    /** Set to true once an image has been written */
+    private boolean written = false;
+    
+    /** The MD5 checksum of the file that was written */
+    private MD5Hash writtenDigest;
+
+    static private final byte[] PATH_SEPARATOR = DFSUtil.string2Bytes(Path.SEPARATOR);
+
+    /** @throws IllegalStateException if the instance has not yet written an image */
+    private void checkWritten() {
+      if (!written) {
+        throw new IllegalStateException("FSImageWriter has not written an image");
+      }
+    }
+    
+    /** @throws IllegalStateException if the instance has already written an image */
+    private void checkNotWritten() {
+      if (written) {
+        throw new IllegalStateException("FSImageWriter has already written an image");
+      }
+    }
+
+    /**
+     * Return the MD5 checksum of the image file that was saved.
+     */
+    MD5Hash getWrittenDigest() {
+      checkWritten();
+      return writtenDigest;
+    }
+
+    void write(File newFile,
+               FSNamesystem sourceNamesystem,
+               FSImageCompression compression)
+      throws IOException {
+      checkNotWritten();
+
+      FSDirectory fsDir = sourceNamesystem.dir;
+      long startTime = now();
+      //
+      // Write out data
+      //
+      MessageDigest digester = MD5Hash.getDigester();
+      FileOutputStream fout = new FileOutputStream(newFile);
+      DigestOutputStream fos = new DigestOutputStream(fout, digester);
+      DataOutputStream out = new DataOutputStream(fos);
+      try {
+        out.writeInt(FSConstants.LAYOUT_VERSION);
+        out.writeInt(sourceNamesystem.getFSImage().getNamespaceID()); // TODO bad dependency
+        out.writeLong(fsDir.rootDir.numItemsInTree());
+        out.writeLong(sourceNamesystem.getGenerationStamp());
+
+        // write compression info and set up compressed stream
+        out = compression.writeHeaderAndWrapStream(fos);
+        LOG.info("Saving image file " + newFile +
+                 " using " + compression);
+
+
+        byte[] byteStore = new byte[4*FSConstants.MAX_PATH_LENGTH];
+        ByteBuffer strbuf = ByteBuffer.wrap(byteStore);
+        // save the root
+        FSImageSerialization.saveINode2Image(strbuf, fsDir.rootDir, out);
+        // save the rest of the nodes
+        saveImage(strbuf, 0, fsDir.rootDir, out);
+        sourceNamesystem.saveFilesUnderConstruction(out);
+        sourceNamesystem.saveSecretManagerState(out);
+        strbuf = null;
+
+        out.flush();
+        fout.getChannel().force(true);
+      } finally {
+        out.close();
+      }
+
+      written = true;
+      // set md5 of the saved image
+      writtenDigest = new MD5Hash(digester.digest());
+
+      LOG.info("Image file of size " + newFile.length() + " saved in " 
+          + (now() - startTime)/1000 + " seconds.");
+    }
+
+    /**
+     * Save file tree image starting from the given root.
+     * This is a recursive procedure, which first saves all children of
+     * a current directory and then moves inside the sub-directories.
+     */
+    private static void saveImage(ByteBuffer parentPrefix,
+                                  int prefixLength,
+                                  INodeDirectory current,
+                                  DataOutputStream out) throws IOException {
+      int newPrefixLength = prefixLength;
+      if (current.getChildrenRaw() == null)
+        return;
+      for(INode child : current.getChildren()) {
+        // print all children first
+        parentPrefix.position(prefixLength);
+        parentPrefix.put(PATH_SEPARATOR).put(child.getLocalNameBytes());
+        FSImageSerialization.saveINode2Image(parentPrefix, child, out);
+      }
+      for(INode child : current.getChildren()) {
+        if(!child.isDirectory())
+          continue;
+        parentPrefix.position(prefixLength);
+        parentPrefix.put(PATH_SEPARATOR).put(child.getLocalNameBytes());
+        newPrefixLength = parentPrefix.position();
+        saveImage(parentPrefix, newPrefixLength, (INodeDirectory)child, out);
+      }
+      parentPrefix.position(prefixLength);
+    }
+  }
+}

+ 288 - 0
src/java/org/apache/hadoop/hdfs/server/namenode/FSImageSerialization.java

@@ -0,0 +1,288 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.namenode;
+
+import java.io.DataInput;
+import java.io.DataInputStream;
+import java.io.DataOutput;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.fs.permission.PermissionStatus;
+import org.apache.hadoop.hdfs.DFSUtil;
+import org.apache.hadoop.hdfs.DeprecatedUTF8;
+import org.apache.hadoop.hdfs.protocol.Block;
+import org.apache.hadoop.hdfs.protocol.DatanodeID;
+import org.apache.hadoop.hdfs.server.common.HdfsConstants.BlockUCState;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.Writable;
+
+/**
+ * Static utility functions for serializing various pieces of data in the correct
+ * format for the FSImage file.
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Evolving
+public abstract class FSImageSerialization {
+
+  /**
+   * In order to reduce allocation, we reuse some static objects. However, the methods
+   * in this class should be thread-safe since image-saving is multithreaded, so 
+   * we need to keep the static objects in a thread-local.
+   */
+  static private final ThreadLocal<TLData> TL_DATA =
+    new ThreadLocal<TLData>() {
+    @Override
+    protected TLData initialValue() {
+      return new TLData();
+    }
+  };
+
+  /**
+   * Simple container "struct" for threadlocal data.
+   */
+  static private final class TLData {
+    final DeprecatedUTF8 U_STR = new DeprecatedUTF8();
+    final FsPermission FILE_PERM = new FsPermission((short) 0);
+  }
+
+  // Helper function that reads in an INodeUnderConstruction
+  // from the input stream
+  //
+  static INodeFileUnderConstruction readINodeUnderConstruction(
+                            DataInputStream in) throws IOException {
+    byte[] name = readBytes(in);
+    short blockReplication = in.readShort();
+    long modificationTime = in.readLong();
+    long preferredBlockSize = in.readLong();
+    int numBlocks = in.readInt();
+    BlockInfo[] blocks = new BlockInfo[numBlocks];
+    Block blk = new Block();
+    int i = 0;
+    for (; i < numBlocks-1; i++) {
+      blk.readFields(in);
+      blocks[i] = new BlockInfo(blk, blockReplication);
+    }
+    // last block is UNDER_CONSTRUCTION
+    if(numBlocks > 0) {
+      blk.readFields(in);
+      blocks[i] = new BlockInfoUnderConstruction(
+        blk, blockReplication, BlockUCState.UNDER_CONSTRUCTION, null);
+    }
+    PermissionStatus perm = PermissionStatus.read(in);
+    String clientName = readString(in);
+    String clientMachine = readString(in);
+
+    // These locations are not used at all
+    int numLocs = in.readInt();
+    DatanodeDescriptor[] locations = new DatanodeDescriptor[numLocs];
+    for (i = 0; i < numLocs; i++) {
+      locations[i] = new DatanodeDescriptor();
+      locations[i].readFields(in);
+    }
+
+    return new INodeFileUnderConstruction(name, 
+                                          blockReplication, 
+                                          modificationTime,
+                                          preferredBlockSize,
+                                          blocks,
+                                          perm,
+                                          clientName,
+                                          clientMachine,
+                                          null);
+  }
+
+  // Helper function that writes an INodeUnderConstruction
+  // into the input stream
+  //
+  static void writeINodeUnderConstruction(DataOutputStream out,
+                                           INodeFileUnderConstruction cons,
+                                           String path) 
+                                           throws IOException {
+    writeString(path, out);
+    out.writeShort(cons.getReplication());
+    out.writeLong(cons.getModificationTime());
+    out.writeLong(cons.getPreferredBlockSize());
+    int nrBlocks = cons.getBlocks().length;
+    out.writeInt(nrBlocks);
+    for (int i = 0; i < nrBlocks; i++) {
+      cons.getBlocks()[i].write(out);
+    }
+    cons.getPermissionStatus().write(out);
+    writeString(cons.getClientName(), out);
+    writeString(cons.getClientMachine(), out);
+
+    out.writeInt(0); //  do not store locations of last block
+  }
+
+  /*
+   * Save one inode's attributes to the image.
+   */
+  static void saveINode2Image(ByteBuffer name,
+                              INode node,
+                              DataOutputStream out) throws IOException {
+    int nameLen = name.position();
+    out.writeShort(nameLen);
+    out.write(name.array(), name.arrayOffset(), nameLen);
+    FsPermission filePerm = TL_DATA.get().FILE_PERM;
+    if (node.isDirectory()) {
+      out.writeShort(0);  // replication
+      out.writeLong(node.getModificationTime());
+      out.writeLong(0);   // access time
+      out.writeLong(0);   // preferred block size
+      out.writeInt(-1);   // # of blocks
+      out.writeLong(node.getNsQuota());
+      out.writeLong(node.getDsQuota());
+      filePerm.fromShort(node.getFsPermissionShort());
+      PermissionStatus.write(out, node.getUserName(),
+                             node.getGroupName(),
+                             filePerm);
+    } else if (node.isLink()) {
+      out.writeShort(0);  // replication
+      out.writeLong(0);   // modification time
+      out.writeLong(0);   // access time
+      out.writeLong(0);   // preferred block size
+      out.writeInt(-2);   // # of blocks
+      Text.writeString(out, ((INodeSymlink)node).getLinkValue());
+      filePerm.fromShort(node.getFsPermissionShort());
+      PermissionStatus.write(out, node.getUserName(),
+                             node.getGroupName(),
+                             filePerm);      
+    } else {
+      INodeFile fileINode = (INodeFile)node;
+      out.writeShort(fileINode.getReplication());
+      out.writeLong(fileINode.getModificationTime());
+      out.writeLong(fileINode.getAccessTime());
+      out.writeLong(fileINode.getPreferredBlockSize());
+      Block[] blocks = fileINode.getBlocks();
+      out.writeInt(blocks.length);
+      for (Block blk : blocks)
+        blk.write(out);
+      filePerm.fromShort(fileINode.getFsPermissionShort());
+      PermissionStatus.write(out, fileINode.getUserName(),
+                             fileINode.getGroupName(),
+                             filePerm);
+    }
+  }
+
+  // This should be reverted to package private once the ImageLoader
+  // code is moved into this package. This method should not be called
+  // by other code.
+  @SuppressWarnings("deprecation")
+  public static String readString(DataInputStream in) throws IOException {
+    DeprecatedUTF8 ustr = TL_DATA.get().U_STR;
+    ustr.readFields(in);
+    return ustr.toString();
+  }
+
+  static String readString_EmptyAsNull(DataInputStream in) throws IOException {
+    final String s = readString(in);
+    return s.isEmpty()? null: s;
+  }
+
+  @SuppressWarnings("deprecation")
+  static void writeString(String str, DataOutputStream out) throws IOException {
+    DeprecatedUTF8 ustr = TL_DATA.get().U_STR;
+    ustr.set(str);
+    ustr.write(out);
+  }
+
+
+  // Same comments apply for this method as for readString()
+  @SuppressWarnings("deprecation")
+  public static byte[] readBytes(DataInputStream in) throws IOException {
+    DeprecatedUTF8 ustr = TL_DATA.get().U_STR;
+    ustr.readFields(in);
+    int len = ustr.getLength();
+    byte[] bytes = new byte[len];
+    System.arraycopy(ustr.getBytes(), 0, bytes, 0, len);
+    return bytes;
+  }
+
+  /**
+   * Reading the path from the image and converting it to byte[][] directly
+   * this saves us an array copy and conversions to and from String
+   * @param in
+   * @return the array each element of which is a byte[] representation 
+   *            of a path component
+   * @throws IOException
+   */
+  @SuppressWarnings("deprecation")
+  public static byte[][] readPathComponents(DataInputStream in)
+      throws IOException {
+    DeprecatedUTF8 ustr = TL_DATA.get().U_STR;
+    
+    ustr.readFields(in);
+    return DFSUtil.bytes2byteArray(ustr.getBytes(),
+      ustr.getLength(), (byte) Path.SEPARATOR_CHAR);
+  }
+
+  /**
+   * DatanodeImage is used to store persistent information
+   * about datanodes into the fsImage.
+   */
+  static class DatanodeImage implements Writable {
+    DatanodeDescriptor node = new DatanodeDescriptor();
+
+    static void skipOne(DataInput in) throws IOException {
+      DatanodeImage nodeImage = new DatanodeImage();
+      nodeImage.readFields(in);
+    }
+
+    /////////////////////////////////////////////////
+    // Writable
+    /////////////////////////////////////////////////
+    /**
+     * Public method that serializes the information about a
+     * Datanode to be stored in the fsImage.
+     */
+    public void write(DataOutput out) throws IOException {
+      new DatanodeID(node).write(out);
+      out.writeLong(node.getCapacity());
+      out.writeLong(node.getRemaining());
+      out.writeLong(node.getLastUpdate());
+      out.writeInt(node.getXceiverCount());
+    }
+
+    /**
+     * Public method that reads a serialized Datanode
+     * from the fsImage.
+     */
+    public void readFields(DataInput in) throws IOException {
+      DatanodeID id = new DatanodeID();
+      id.readFields(in);
+      long capacity = in.readLong();
+      long remaining = in.readLong();
+      long lastUpdate = in.readLong();
+      int xceiverCount = in.readInt();
+
+      // update the DatanodeDescriptor with the data we read in
+      node.updateRegInfo(id);
+      node.setStorageID(id.getStorageID());
+      node.setCapacity(capacity);
+      node.setRemaining(remaining);
+      node.setLastUpdate(lastUpdate);
+      node.setXceiverCount(xceiverCount);
+    }
+  }
+}

+ 1 - 1
src/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java

@@ -4780,7 +4780,7 @@ public class FSNamesystem implements FSConstants, FSNamesystemMBean, FSClusterSt
                                   " but is not under construction.");
           }
           INodeFileUnderConstruction cons = (INodeFileUnderConstruction) node;
-          FSImage.writeINodeUnderConstruction(out, cons, path);
+          FSImageSerialization.writeINodeUnderConstruction(out, cons, path);
         }
       }
     }

+ 7 - 6
src/java/org/apache/hadoop/hdfs/tools/offlineImageViewer/ImageLoaderCurrent.java

@@ -28,6 +28,7 @@ import org.apache.hadoop.fs.permission.FsPermission;
 import org.apache.hadoop.hdfs.protocol.DatanodeInfo.AdminStates;
 import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier;
 import org.apache.hadoop.hdfs.server.namenode.FSImage;
+import org.apache.hadoop.hdfs.server.namenode.FSImageSerialization;
 import org.apache.hadoop.hdfs.tools.offlineImageViewer.ImageVisitor.ImageElement;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.io.WritableUtils;
@@ -234,7 +235,7 @@ class ImageLoaderCurrent implements ImageLoader {
 
     for(int i = 0; i < numINUC; i++) {
       v.visitEnclosingElement(ImageElement.INODE_UNDER_CONSTRUCTION);
-      byte [] name = FSImage.readBytes(in);
+      byte [] name = FSImageSerialization.readBytes(in);
       String n = new String(name, "UTF8");
       v.visit(ImageElement.INODE_PATH, n);
       v.visit(ImageElement.REPLICATION, in.readShort());
@@ -245,8 +246,8 @@ class ImageLoaderCurrent implements ImageLoader {
       processBlocks(in, v, numBlocks, skipBlocks);
 
       processPermission(in, v);
-      v.visit(ImageElement.CLIENT_NAME, FSImage.readString(in));
-      v.visit(ImageElement.CLIENT_MACHINE, FSImage.readString(in));
+      v.visit(ImageElement.CLIENT_NAME, FSImageSerialization.readString(in));
+      v.visit(ImageElement.CLIENT_MACHINE, FSImageSerialization.readString(in));
 
       // Skip over the datanode descriptors, which are still stored in the
       // file but are not used by the datanode or loaded into memory
@@ -257,8 +258,8 @@ class ImageLoaderCurrent implements ImageLoader {
         in.readLong();
         in.readLong();
         in.readInt();
-        FSImage.readString(in);
-        FSImage.readString(in);
+        FSImageSerialization.readString(in);
+        FSImageSerialization.readString(in);
         WritableUtils.readEnum(in, AdminStates.class);
       }
 
@@ -336,7 +337,7 @@ class ImageLoaderCurrent implements ImageLoader {
 
     for(long i = 0; i < numInodes; i++) {
       v.visitEnclosingElement(ImageElement.INODE);
-      v.visit(ImageElement.INODE_PATH, FSImage.readString(in));
+      v.visit(ImageElement.INODE_PATH, FSImageSerialization.readString(in));
       v.visit(ImageElement.REPLICATION, in.readShort());
       v.visit(ImageElement.MODIFICATION_TIME, formatDate(in.readLong()));
       if(imageVersion <= -17) // added in version -17

+ 10 - 0
src/test/findbugsExcludeFile.xml

@@ -225,4 +225,14 @@
        <Method name="run" />
        <Bug pattern="REC_CATCH_EXCEPTION" />
      </Match>
+
+     <!--
+      Findbugs doesn't realize that closing a FilterOutputStream pushes the close down to
+      wrapped streams, too.
+     -->
+     <Match>
+       <Class name="org.apache.hadoop.hdfs.server.namenode.FSImageFormat$Writer" />
+       <Method name="write" />
+       <Bug pattern="OS_OPEN_STREAM" />
+     </Match>
  </FindBugsFilter>