浏览代码

Merging changes r1040026:r1044166 from trunk to federation branch

git-svn-id: https://svn.apache.org/repos/asf/hadoop/hdfs/branches/HDFS-1052@1078938 13f79535-47bb-0310-9956-ffa450edef68
Suresh Srinivas 14 年之前
父节点
当前提交
2b2f4ff067
共有 28 个文件被更改,包括 791 次插入317 次删除
  1. 26 0
      CHANGES.txt
  2. 3 3
      src/java/org/apache/hadoop/fs/Hdfs.java
  3. 106 0
      src/java/org/apache/hadoop/hdfs/CorruptFileBlockIterator.java
  4. 1 1
      src/java/org/apache/hadoop/hdfs/DFSClient.java
  5. 2 4
      src/java/org/apache/hadoop/hdfs/DistributedFileSystem.java
  6. 0 1
      src/java/org/apache/hadoop/hdfs/protocol/ClientProtocol.java
  7. 108 0
      src/java/org/apache/hadoop/hdfs/protocol/CorruptFileBlocks.java
  8. 5 5
      src/java/org/apache/hadoop/hdfs/server/balancer/Balancer.java
  9. 8 1
      src/java/org/apache/hadoop/hdfs/server/datanode/BlockSender.java
  10. 4 1
      src/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java
  11. 9 0
      src/java/org/apache/hadoop/hdfs/server/namenode/BlockInfo.java
  12. 22 38
      src/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java
  13. 6 5
      src/java/org/apache/hadoop/hdfs/server/namenode/FSImage.java
  14. 5 5
      src/java/org/apache/hadoop/hdfs/server/namenode/FSImageCompression.java
  15. 171 131
      src/java/org/apache/hadoop/hdfs/server/namenode/FSImageFormat.java
  16. 8 1
      src/java/org/apache/hadoop/hdfs/server/namenode/FSImageSerialization.java
  17. 8 1
      src/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
  18. 40 0
      src/java/org/apache/hadoop/hdfs/server/namenode/INode.java
  19. 52 25
      src/java/org/apache/hadoop/hdfs/server/namenode/INodeDirectory.java
  20. 1 1
      src/java/org/apache/hadoop/hdfs/server/namenode/NameNode.java
  21. 1 2
      src/java/org/apache/hadoop/hdfs/server/namenode/NamenodeFsck.java
  22. 2 2
      src/test/findbugsExcludeFile.xml
  23. 58 57
      src/test/hdfs/org/apache/hadoop/hdfs/TestLargeBlock.java
  24. 79 0
      src/test/hdfs/org/apache/hadoop/hdfs/protocol/TestCorruptFileBlocks.java
  25. 1 1
      src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/TestFsck.java
  26. 43 12
      src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/TestListCorruptFileBlocks.java
  27. 3 9
      src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/TestSaveNamespace.java
  28. 19 11
      src/test/unit/org/apache/hadoop/hdfs/server/datanode/TestBlockRecovery.java

+ 26 - 0
CHANGES.txt

@@ -232,6 +232,11 @@ Trunk (unreleased changes)
 
     HDFS-1481. NameNode should validate fsimage before rolling. (hairong)
 
+    HDFS-1506. Refactor fsimage loading code. (hairong)
+
+    HDFS-1533. A more elegant FileSystem#listCorruptFileBlocks API
+    (HDFS portion) (Patrick Kling via hairong)
+
   OPTIMIZATIONS
 
     HDFS-1458. Improve checkpoint performance by avoiding unnecessary image
@@ -650,6 +655,27 @@ Release 0.22.0 - Unreleased
     HDFS-1167. New property for local conf directory in system-test-hdfs.xml
     file. (Vinay Thota via cos)
 
+    HDFS-1503. TestSaveNamespace fails. (Todd Lipcon via cos)
+
+    HDFS-1524. Image loader should make sure to read every byte in image file.
+    (hairong)
+
+    HDFS-1523. TestLargeBlock is failing on trunk. (cos)
+
+    HDFS-1502. TestBlockRecovery triggers NPE in assert. (hairong via cos)
+
+    HDFS-1532. Exclude Findbugs warning in FSImageFormat$Saver. (Todd Lipcon
+    via cos)
+
+    HDFS-1527. SocketOutputStream.transferToFully fails for blocks >= 2GB on
+    32 bit JVM. (Patrick Kling via cos)
+
+    HDFS-1531. Clean up stack traces due to duplicate MXBean registration.
+    (Todd Lipcon via cos)
+
+    HDFS-613. TestBalancer and TestBlockTokenWithDFS fail Balancer assert.
+    (Todd Lipcon via cos)
+
 Release 0.21.1 - Unreleased
 
     HDFS-1411. Correct backup node startup command in hdfs user guide.

+ 3 - 3
src/java/org/apache/hadoop/fs/Hdfs.java

@@ -30,6 +30,7 @@ import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceStability;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.hdfs.CorruptFileBlockIterator;
 import org.apache.hadoop.hdfs.DFSClient;
 import org.apache.hadoop.hdfs.DFSUtil;
 import org.apache.hadoop.hdfs.protocol.DirectoryListing;
@@ -307,10 +308,9 @@ public class Hdfs extends AbstractFileSystem {
    * {@inheritDoc}
    */
   @Override
-  public CorruptFileBlocks listCorruptFileBlocks(String path,
-                                                 String cookie)
+  public RemoteIterator<Path> listCorruptFileBlocks(Path path)
     throws IOException {
-    return dfs.listCorruptFileBlocks(path, cookie);
+    return new CorruptFileBlockIterator(dfs, path);
   }
 
   @Override

+ 106 - 0
src/java/org/apache/hadoop/hdfs/CorruptFileBlockIterator.java

@@ -0,0 +1,106 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hdfs;
+
+import java.io.IOException;
+import java.util.NoSuchElementException;
+
+import org.apache.hadoop.hdfs.protocol.CorruptFileBlocks;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.RemoteIterator;
+
+/**
+ * Provides an iterator interface for listCorruptFileBlocks.
+ * This class is used by DistributedFileSystem and Hdfs.
+ */
+public class CorruptFileBlockIterator implements RemoteIterator<Path> {
+  private final DFSClient dfs;
+  private String path;
+
+  private String[] files = null;
+  private int fileIdx = 0;
+  private String cookie = null;
+  private Path nextPath = null;
+
+  private int callsMade = 0;
+
+  public CorruptFileBlockIterator(DFSClient dfs, Path path) throws IOException {
+    this.dfs = dfs;
+    this.path = path2String(path);
+    loadNext();
+  }
+
+  /**
+   * @return the number of calls made to the DFSClient.
+   * This is for debugging and testing purposes.
+   */
+  public int getCallsMade() {
+    return callsMade;
+  }
+
+  private String path2String(Path path) {
+    return path.toUri().getPath();
+  }
+
+  private Path string2Path(String string) {
+    return new Path(string);
+  }
+
+  private void loadNext() throws IOException {
+    if (files == null || fileIdx >= files.length) {
+      CorruptFileBlocks cfb = dfs.listCorruptFileBlocks(path, cookie);
+      files = cfb.getFiles();
+      cookie = cfb.getCookie();
+      fileIdx = 0;
+      callsMade++;
+    }
+
+    if (fileIdx >= files.length) {
+      // received an empty response
+      // there are no more corrupt file blocks
+      nextPath = null;
+    } else {
+      nextPath = string2Path(files[fileIdx]);
+      fileIdx++;
+    }
+  }
+
+  /**
+   * {@inheritDoc}
+   */
+  @Override
+  public boolean hasNext() {
+    return nextPath != null;
+  }
+
+  /**
+   * {@inheritDoc}
+   */
+  @Override
+  public Path next() throws IOException {
+    if (!hasNext()) {
+      throw new NoSuchElementException("No more corrupt file blocks");
+    }
+
+    Path result = nextPath;
+    loadNext();
+
+    return result;
+  }
+}

+ 1 - 1
src/java/org/apache/hadoop/hdfs/DFSClient.java

@@ -61,12 +61,12 @@ import org.apache.hadoop.fs.Options;
 import org.apache.hadoop.fs.ParentNotDirectoryException;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.UnresolvedLinkException;
-import org.apache.hadoop.fs.CorruptFileBlocks;
 import org.apache.hadoop.fs.permission.FsPermission;
 import org.apache.hadoop.hdfs.protocol.AlreadyBeingCreatedException;
 import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
 import org.apache.hadoop.hdfs.protocol.ClientDatanodeProtocol;
 import org.apache.hadoop.hdfs.protocol.ClientProtocol;
+import org.apache.hadoop.hdfs.protocol.CorruptFileBlocks;
 import org.apache.hadoop.hdfs.protocol.DSQuotaExceededException;
 import org.apache.hadoop.hdfs.protocol.DataTransferProtocol;
 import org.apache.hadoop.hdfs.protocol.DatanodeID;

+ 2 - 4
src/java/org/apache/hadoop/hdfs/DistributedFileSystem.java

@@ -43,7 +43,6 @@ import org.apache.hadoop.fs.Options;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.PathFilter;
 import org.apache.hadoop.fs.RemoteIterator;
-import org.apache.hadoop.fs.CorruptFileBlocks;
 import org.apache.hadoop.fs.permission.FsPermission;
 import org.apache.hadoop.hdfs.DFSClient.DFSDataInputStream;
 import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
@@ -606,10 +605,9 @@ public class DistributedFileSystem extends FileSystem {
    * {@inheritDoc}
    */
   @Override
-  public CorruptFileBlocks listCorruptFileBlocks(String path,
-                                                 String cookie)
+  public RemoteIterator<Path> listCorruptFileBlocks(Path path)
     throws IOException {
-    return dfs.listCorruptFileBlocks(path, cookie);
+    return new CorruptFileBlockIterator(dfs, path);
   }
 
   /** Return statistics for each datanode. */

+ 0 - 1
src/java/org/apache/hadoop/hdfs/protocol/ClientProtocol.java

@@ -32,7 +32,6 @@ import org.apache.hadoop.fs.FileAlreadyExistsException;
 import org.apache.hadoop.fs.ParentNotDirectoryException;
 import org.apache.hadoop.fs.InvalidPathException;
 import org.apache.hadoop.fs.UnresolvedLinkException;
-import org.apache.hadoop.fs.CorruptFileBlocks;
 import org.apache.hadoop.fs.Options.Rename;
 import org.apache.hadoop.fs.permission.FsPermission;
 import org.apache.hadoop.hdfs.DFSConfigKeys;

+ 108 - 0
src/java/org/apache/hadoop/hdfs/protocol/CorruptFileBlocks.java

@@ -0,0 +1,108 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.protocol;
+
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.Text;
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.Arrays;
+
+/**
+ * Contains a list of paths corresponding to corrupt files and a cookie
+ * used for iterative calls to NameNode.listCorruptFileBlocks.
+ *
+ */
+public class CorruptFileBlocks implements Writable {
+  // used for hashCode
+  private static final int PRIME = 16777619;
+
+  private String[] files;
+  private String cookie;
+
+  public CorruptFileBlocks() {
+    this(new String[0], "");
+  }
+
+  public CorruptFileBlocks(String[] files, String cookie) {
+    this.files = files;
+    this.cookie = cookie;
+  }
+
+  public String[] getFiles() {
+    return files;
+  }
+
+  public String getCookie() {
+    return cookie;
+  }
+
+  /**
+   * {@inheritDoc}
+   */
+  @Override
+  public void readFields(DataInput in) throws IOException {
+    int fileCount = in.readInt();
+    files = new String[fileCount];
+    for (int i = 0; i < fileCount; i++) {
+      files[i] = Text.readString(in);
+    }
+    cookie = Text.readString(in);
+  }
+
+  /**
+   * {@inheritDoc}
+   */
+  @Override
+  public void write(DataOutput out) throws IOException {
+    out.writeInt(files.length);
+    for (int i = 0; i < files.length; i++) {
+      Text.writeString(out, files[i]);
+    }
+    Text.writeString(out, cookie);
+  }
+
+  /**
+   * {@inheritDoc}
+   */
+  public boolean equals(Object obj) {
+    if (this == obj) {
+      return true;
+    }
+    if (!(obj instanceof CorruptFileBlocks)) {
+      return false;
+    }
+    CorruptFileBlocks other = (CorruptFileBlocks) obj;
+    return cookie.equals(other.cookie) &&
+      Arrays.equals(files, other.files);
+  }
+
+  /**
+   * {@inheritDoc}
+   */
+  public int hashCode() {
+    int result = cookie.hashCode();
+
+    for (String file : files) {
+      result = PRIME * result + file.hashCode();
+    }
+
+    return result;
+  }
+}

+ 5 - 5
src/java/org/apache/hadoop/hdfs/server/balancer/Balancer.java

@@ -886,11 +886,11 @@ public class Balancer {
     // Then match nodes on different racks
     chooseNodes(false);
     
-    assert (datanodes.size() == 
-      overUtilizedDatanodes.size()+underUtilizedDatanodes.size()+
-      aboveAvgUtilizedDatanodes.size()+belowAvgUtilizedDatanodes.size()+
-      sources.size()+targets.size())
-      : "Mismatched number of datanodes";
+    assert (datanodes.size() >= sources.size()+targets.size())
+      : "Mismatched number of datanodes (" +
+      datanodes.size() + " total, " +
+      sources.size() + " sources, " +
+      targets.size() + " targets)";
 
     long bytesToMove = 0L;
     for (Source src : sources) {

+ 8 - 1
src/java/org/apache/hadoop/hdfs/server/datanode/BlockSender.java

@@ -54,6 +54,8 @@ class BlockSender implements java.io.Closeable, FSConstants {
   /** The visible length of a replica. */
   private final long replicaVisibleLength;
 
+  private static final boolean is32Bit = System.getProperty("sun.arch.data.model").equals("32");
+
   private InputStream blockIn; // data stream
   private long blockInPosition = -1; // updated while using transferTo().
   private DataInputStream checksumIn; // checksum datastream
@@ -144,7 +146,11 @@ class BlockSender implements java.io.Closeable, FSConstants {
       this.chunkOffsetOK = chunkOffsetOK;
       this.corruptChecksumOk = corruptChecksumOk;
       this.verifyChecksum = verifyChecksum;
-      this.transferToAllowed = datanode.transferToAllowed;
+
+      // transferToFully() fails on 32 bit platforms for block sizes >= 2GB,
+      // use normal transfer in those cases
+      this.transferToAllowed = datanode.transferToAllowed &&
+        (!is32Bit || length < (long) Integer.MAX_VALUE);
       this.clientTraceFmt = clientTraceFmt;
 
       if ( !corruptChecksumOk || datanode.data.metaFileExists(block) ) {
@@ -397,6 +403,7 @@ class BlockSender implements java.io.Closeable, FSConstants {
       /* exception while writing to the client (well, with transferTo(),
        * it could also be while reading from the local file).
        */
+      LOG.error("BlockSender.sendChunks() exception: " + StringUtils.stringifyException(e));
       throw ioeToSocketException(e);
     }
 

+ 4 - 1
src/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java

@@ -1403,8 +1403,11 @@ public class DataNode extends Configured
     try {
       ObjectName mxbeanName = new ObjectName("HadoopInfo:type=DataNodeInfo");
       mbs.registerMBean(this, mxbeanName);
+    } catch ( javax.management.InstanceAlreadyExistsException iaee ) {
+      // in unit tests, we may have multiple datanodes in the same JVM
+      LOG.info("DataNode MXBean already registered");
     } catch ( javax.management.JMException e ) {
-      LOG.warn("Failed to register NameNode MXBean", e);
+      LOG.warn("Failed to register DataNode MXBean", e);
     }
   }
   

+ 9 - 0
src/java/org/apache/hadoop/hdfs/server/namenode/BlockInfo.java

@@ -40,6 +40,15 @@ class BlockInfo extends Block implements LightWeightGSet.LinkedElement {
    */
   private Object[] triplets;
 
+  /**
+   * Construct an entry for blocksmap
+   * @param replication the block's replication factor
+   */
+  protected BlockInfo(int replication) {
+    this.triplets = new Object[3*replication];
+    this.inode = null;
+  }
+  
   protected BlockInfo(Block blk, int replication) {
     super(blk);
     this.triplets = new Object[3*replication];

+ 22 - 38
src/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java

@@ -287,39 +287,9 @@ class FSDirectory implements Closeable {
 
   }
 
-  INodeDirectory addToParent( byte[][] src,
-                              INodeDirectory parentINode,
-                              PermissionStatus permissions,
-                              Block[] blocks, 
-                              String symlink,
-                              short replication,
-                              long modificationTime,
-                              long atime,
-                              long nsQuota,
-                              long dsQuota,
-                              long preferredBlockSize,
-                              boolean propagateModTime) 
-                              throws UnresolvedLinkException {
+  INodeDirectory addToParent(byte[] src, INodeDirectory parentINode,
+      INode newNode, boolean propagateModTime) throws UnresolvedLinkException {
     // NOTE: This does not update space counts for parents
-    // create new inode
-    INode newNode;
-    if (blocks == null) {
-      if (nsQuota >= 0 || dsQuota >= 0) {
-        newNode = new INodeDirectoryWithQuota(
-            permissions, modificationTime, nsQuota, dsQuota);
-      } else {
-        newNode = new INodeDirectory(permissions, modificationTime);
-      }
-    } else  {
-      if (symlink.length() != 0) {
-        newNode = new INodeSymlink(symlink, modificationTime, atime, permissions);
-        ((INodeSymlink)newNode).setLinkValue(symlink);
-      } else {
-        newNode = new INodeFile(permissions, blocks.length, replication,
-                                modificationTime, atime, preferredBlockSize);
-      }
-    }
-    // add new node to the parent
     INodeDirectory newParent = null;
     writeLock();
     try {
@@ -332,14 +302,12 @@ class FSDirectory implements Closeable {
       }
       if(newParent == null)
         return null;
-      if(blocks != null) {
-        int nrBlocks = blocks.length;
+      if(!newNode.isDirectory() && !newNode.isLink()) {
         // Add file->block mapping
-        assert !newNode.isLink();
         INodeFile newF = (INodeFile)newNode;
-        for (int i = 0; i < nrBlocks; i++) {
-          BlockInfo blockInfo = new BlockInfo(blocks[i], newF.getReplication());
-          newF.setBlock(i, getBlockManager().addINode(blockInfo, newF));
+        BlockInfo[] blocks = newF.getBlocks();
+        for (int i = 0; i < blocks.length; i++) {
+          newF.setBlock(i, getBlockManager().addINode(blocks[i], newF));
         }
       }
     } finally {
@@ -1278,6 +1246,22 @@ class FSDirectory implements Closeable {
     }
   }
   
+  /**
+   * Get the parent node of path.
+   * 
+   * @param path the path to explore
+   * @return its parent node
+   */
+  INodeDirectory getParent(byte[][] path) 
+    throws FileNotFoundException, UnresolvedLinkException {
+    readLock();
+    try {
+      return rootDir.getParent(path);
+    } finally {
+      readUnlock();
+    }
+  }
+  
   /** 
    * Check whether the filepath could be created
    */

+ 6 - 5
src/java/org/apache/hadoop/hdfs/server/namenode/FSImage.java

@@ -1106,8 +1106,9 @@ public class FSImage extends Storage {
    * "re-save" and consolidate the edit-logs
    */
   boolean loadFSImage(File curFile) throws IOException {
-    FSImageFormat.Loader loader = new FSImageFormat.Loader(conf);
-    loader.load(curFile, getFSNamesystem());
+    FSImageFormat.Loader loader = new FSImageFormat.Loader(
+        conf, getFSNamesystem());
+    loader.load(curFile);
 
     namesystem.setBlockPoolId(this.getBlockPoolID());
 
@@ -1164,10 +1165,10 @@ public class FSImage extends Storage {
    * Save the contents of the FS image to the file.
    */
   void saveFSImage(File newFile) throws IOException {
-    FSImageFormat.Writer writer = new FSImageFormat.Writer();
+    FSImageFormat.Saver saver = new FSImageFormat.Saver();
     FSImageCompression compression = FSImageCompression.createCompression(conf);
-    writer.write(newFile, getFSNamesystem(), compression);
-    setImageDigest(writer.getWrittenDigest());
+    saver.save(newFile, getFSNamesystem(), compression);
+    setImageDigest(saver.getSavedDigest());
   }
 
   public void setImageDigest(MD5Hash digest) {

+ 5 - 5
src/java/org/apache/hadoop/hdfs/server/namenode/FSImageCompression.java

@@ -60,7 +60,7 @@ class FSImageCompression {
   /**
    * Create a "noop" compression - i.e. uncompressed
    */
-  public static FSImageCompression createNoopCompression() {
+  static FSImageCompression createNoopCompression() {
     return new FSImageCompression();
   }
 
@@ -69,7 +69,7 @@ class FSImageCompression {
    * Configuration object.
    * @throws IOException if the specified codec is not available.
    */
-  public static FSImageCompression createCompression(Configuration conf)
+  static FSImageCompression createCompression(Configuration conf)
     throws IOException {
     boolean compressImage = conf.getBoolean(
       DFSConfigKeys.DFS_IMAGE_COMPRESS_KEY,
@@ -107,7 +107,7 @@ class FSImageCompression {
    * @throws IOException if the specified codec is not available or the
    * underlying IO fails.
    */
-  public static FSImageCompression readCompressionHeader(
+  static FSImageCompression readCompressionHeader(
     Configuration conf,
     DataInputStream dis) throws IOException
   {
@@ -129,7 +129,7 @@ class FSImageCompression {
    * @throws IOException If the decompressor cannot be instantiated or an IO
    * error occurs.
    */
-  public DataInputStream unwrapInputStream(InputStream is) throws IOException {
+  DataInputStream unwrapInputStream(InputStream is) throws IOException {
     if (imageCodec != null) {
       return new DataInputStream(imageCodec.createInputStream(is));
     } else {
@@ -150,7 +150,7 @@ class FSImageCompression {
    * @throws IOException if an IO error occurs or the compressor cannot be
    * instantiated
    */
-  public DataOutputStream writeHeaderAndWrapStream(OutputStream os)
+  DataOutputStream writeHeaderAndWrapStream(OutputStream os)
   throws IOException {
     DataOutputStream dos = new DataOutputStream(os);
 

+ 171 - 131
src/java/org/apache/hadoop/hdfs/server/namenode/FSImageFormat.java

@@ -32,29 +32,37 @@ import java.security.MessageDigest;
 import java.util.Arrays;
 
 import org.apache.commons.logging.Log;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.permission.PermissionStatus;
 import org.apache.hadoop.hdfs.DFSUtil;
-import org.apache.hadoop.hdfs.protocol.Block;
 import org.apache.hadoop.hdfs.protocol.FSConstants;
 import org.apache.hadoop.hdfs.server.common.GenerationStamp;
 import org.apache.hadoop.io.MD5Hash;
 import org.apache.hadoop.io.Text;
 
 /**
- * Contains inner classes for reading or writing the on-disk format for FSImages
+ * Contains inner classes for reading or writing the on-disk format for FSImages.
  */
-public abstract class FSImageFormat {
+@InterfaceAudience.Private
+@InterfaceStability.Evolving
+class FSImageFormat {
   private static final Log LOG = FSImage.LOG;
   
+  // Static-only class
+  private FSImageFormat() {}
+  
   /**
    * A one-shot class responsible for loading an image. The load() function
    * should be called once, after which the getter methods may be used to retrieve
    * information about the image that was loaded, if loading was successful.
    */
-  public static class Loader {
+  static class Loader {
     private final Configuration conf;
+    /** which namesystem this loader is working for */
+    private final FSNamesystem namesystem;
 
     /** Set to true once a file has been loaded using this loader. */
     private boolean loaded = false;
@@ -66,8 +74,9 @@ public abstract class FSImageFormat {
     /** The MD5 sum of the loaded file */
     private MD5Hash imgDigest;
 
-    public Loader(Configuration conf) {
+    Loader(Configuration conf, FSNamesystem namesystem) {
       this.conf = conf;
+      this.namesystem = namesystem;
     }
 
     /**
@@ -115,14 +124,13 @@ public abstract class FSImageFormat {
       }
     }
 
-    void load(File curFile, FSNamesystem targetNamesystem)
+    void load(File curFile)
       throws IOException
     {
       checkNotLoaded();
       assert curFile != null : "curFile is null";
 
       long startTime = now();
-      FSDirectory fsDir = targetNamesystem.dir;
 
       //
       // Load in bits
@@ -155,7 +163,7 @@ public abstract class FSImageFormat {
         // read in the last generation stamp.
         if (imgVersion <= -12) {
           long genstamp = in.readLong();
-          targetNamesystem.setGenerationStamp(genstamp); 
+          namesystem.setGenerationStamp(genstamp); 
         }
 
         // read compression related info
@@ -169,110 +177,21 @@ public abstract class FSImageFormat {
 
         LOG.info("Loading image file " + curFile + " using " + compression);
 
-
-        // read file info
-        short replication = targetNamesystem.getDefaultReplication();
-
+        // load all inodes
         LOG.info("Number of files = " + numFiles);
-
-        byte[][] pathComponents;
-        byte[][] parentPath = {{}};
-        INodeDirectory parentINode = fsDir.rootDir;
-        for (long i = 0; i < numFiles; i++) {
-          long modificationTime = 0;
-          long atime = 0;
-          long blockSize = 0;
-          pathComponents = FSImageSerialization.readPathComponents(in);
-          replication = in.readShort();
-          replication = targetNamesystem.adjustReplication(replication);
-          modificationTime = in.readLong();
-          if (imgVersion <= -17) {
-            atime = in.readLong();
-          }
-          if (imgVersion <= -8) {
-            blockSize = in.readLong();
-          }
-          int numBlocks = in.readInt();
-          Block blocks[] = null;
-
-          // for older versions, a blocklist of size 0
-          // indicates a directory.
-          if ((-9 <= imgVersion && numBlocks > 0) ||
-              (imgVersion < -9 && numBlocks >= 0)) {
-            blocks = new Block[numBlocks];
-            for (int j = 0; j < numBlocks; j++) {
-              blocks[j] = new Block();
-              if (-14 < imgVersion) {
-                blocks[j].set(in.readLong(), in.readLong(), 
-                              GenerationStamp.GRANDFATHER_GENERATION_STAMP);
-              } else {
-                blocks[j].readFields(in);
-              }
-            }
-          }
-          // Older versions of HDFS does not store the block size in inode.
-          // If the file has more than one block, use the size of the 
-          // first block as the blocksize. Otherwise use the default block size.
-          //
-          if (-8 <= imgVersion && blockSize == 0) {
-            if (numBlocks > 1) {
-              blockSize = blocks[0].getNumBytes();
-            } else {
-              long first = ((numBlocks == 1) ? blocks[0].getNumBytes(): 0);
-              blockSize = Math.max(targetNamesystem.getDefaultBlockSize(), first);
-            }
-          }
-          
-          // get quota only when the node is a directory
-          long nsQuota = -1L;
-          if (imgVersion <= -16 && blocks == null  && numBlocks == -1) {
-            nsQuota = in.readLong();
-          }
-          long dsQuota = -1L;
-          if (imgVersion <= -18 && blocks == null && numBlocks == -1) {
-            dsQuota = in.readLong();
-          }
-
-          // Read the symlink only when the node is a symlink
-          String symlink = "";
-          if (imgVersion <= -23 && numBlocks == -2) {
-            symlink = Text.readString(in);
-          }
-          
-          PermissionStatus permissions = targetNamesystem.getUpgradePermission();
-          if (imgVersion <= -11) {
-            permissions = PermissionStatus.read(in);
-          }
-          
-          if (isRoot(pathComponents)) { // it is the root
-            // update the root's attributes
-            if (nsQuota != -1 || dsQuota != -1) {
-              fsDir.rootDir.setQuota(nsQuota, dsQuota);
-            }
-            fsDir.rootDir.setModificationTime(modificationTime);
-            fsDir.rootDir.setPermissionStatus(permissions);
-            continue;
-          }
-          // check if the new inode belongs to the same parent
-          if(!isParent(pathComponents, parentPath)) {
-            parentINode = null;
-            parentPath = getParent(pathComponents);
-          }
-          // add new inode
-          // without propagating modification time to parent
-          parentINode = fsDir.addToParent(pathComponents, parentINode, permissions,
-                                          blocks, symlink, replication, modificationTime, 
-                                          atime, nsQuota, dsQuota, blockSize, false);
-        }
+        loadFullNameINodes(numFiles, in);
 
         // load datanode info
         this.loadDatanodes(in);
 
         // load Files Under Construction
-        this.loadFilesUnderConstruction(in, targetNamesystem);
+        this.loadFilesUnderConstruction(in);
 
-        this.loadSecretManagerState(in, targetNamesystem);
+        this.loadSecretManagerState(in);
 
+        // make sure to read to the end of file
+        int eof = in.read();
+        assert eof == -1 : "Should have reached the end of image file " + curFile;
       } finally {
         in.close();
       }
@@ -284,6 +203,128 @@ public abstract class FSImageFormat {
           + (now() - startTime)/1000 + " seconds.");
     }
 
+  /** Update the root node's attributes */
+  private void updateRootAttr(INode root) {                                                           
+    long nsQuota = root.getNsQuota();
+    long dsQuota = root.getDsQuota();
+    FSDirectory fsDir = namesystem.dir;
+    if (nsQuota != -1 || dsQuota != -1) {
+      fsDir.rootDir.setQuota(nsQuota, dsQuota);
+    }
+    fsDir.rootDir.setModificationTime(root.getModificationTime());
+    fsDir.rootDir.setPermissionStatus(root.getPermissionStatus());    
+  }
+
+  /**
+   * load fsimage files assuming full path names are stored
+   * 
+   * @param numFiles total number of files to load
+   * @param in data input stream
+   * @throws IOException if any error occurs
+   */
+  private void loadFullNameINodes(long numFiles,
+      DataInputStream in) throws IOException {
+    byte[][] pathComponents;
+    byte[][] parentPath = {{}};      
+    FSDirectory fsDir = namesystem.dir;
+    INodeDirectory parentINode = fsDir.rootDir;
+    for (long i = 0; i < numFiles; i++) {
+      pathComponents = FSImageSerialization.readPathComponents(in);
+      INode newNode = loadINode(in);
+
+      if (isRoot(pathComponents)) { // it is the root
+        // update the root's attributes
+        updateRootAttr(newNode);
+        continue;
+      }
+      // check if the new inode belongs to the same parent
+      if(!isParent(pathComponents, parentPath)) {
+        parentINode = fsDir.getParent(pathComponents);
+        parentPath = getParent(pathComponents);
+      }
+
+      // add new inode
+      parentINode = fsDir.addToParent(pathComponents[pathComponents.length-1], 
+          parentINode, newNode, false);
+    }
+  }
+
+  /**
+   * load an inode from fsimage except for its name
+   * 
+   * @param in data input stream from which image is read
+   * @return an inode
+   */
+  private INode loadINode(DataInputStream in)
+  throws IOException {
+    long modificationTime = 0;
+    long atime = 0;
+    long blockSize = 0;
+    
+    short replication = in.readShort();
+    replication = namesystem.adjustReplication(replication);
+    modificationTime = in.readLong();
+    if (imgVersion <= -17) {
+      atime = in.readLong();
+    }
+    if (imgVersion <= -8) {
+      blockSize = in.readLong();
+    }
+    int numBlocks = in.readInt();
+    BlockInfo blocks[] = null;
+
+    // for older versions, a blocklist of size 0
+    // indicates a directory.
+    if ((-9 <= imgVersion && numBlocks > 0) ||
+        (imgVersion < -9 && numBlocks >= 0)) {
+      blocks = new BlockInfo[numBlocks];
+      for (int j = 0; j < numBlocks; j++) {
+        blocks[j] = new BlockInfo(replication);
+        if (-14 < imgVersion) {
+          blocks[j].set(in.readLong(), in.readLong(), 
+                        GenerationStamp.GRANDFATHER_GENERATION_STAMP);
+        } else {
+          blocks[j].readFields(in);
+        }
+      }
+    }
+    // Older versions of HDFS does not store the block size in inode.
+    // If the file has more than one block, use the size of the 
+    // first block as the blocksize. Otherwise use the default block size.
+    //
+    if (-8 <= imgVersion && blockSize == 0) {
+      if (numBlocks > 1) {
+        blockSize = blocks[0].getNumBytes();
+      } else {
+        long first = ((numBlocks == 1) ? blocks[0].getNumBytes(): 0);
+        blockSize = Math.max(namesystem.getDefaultBlockSize(), first);
+      }
+    }
+    
+    // get quota only when the node is a directory
+    long nsQuota = -1L;
+    if (imgVersion <= -16 && blocks == null  && numBlocks == -1) {
+        nsQuota = in.readLong();
+      }
+      long dsQuota = -1L;
+      if (imgVersion <= -18 && blocks == null && numBlocks == -1) {
+        dsQuota = in.readLong();
+      }
+  
+      // Read the symlink only when the node is a symlink
+      String symlink = "";
+      if (imgVersion <= -23 && numBlocks == -2) {
+        symlink = Text.readString(in);
+      }
+      
+      PermissionStatus permissions = namesystem.getUpgradePermission();
+      if (imgVersion <= -11) {
+        permissions = PermissionStatus.read(in);
+      }
+  
+      return INode.newINode(permissions, blocks, symlink, replication,
+          modificationTime, atime, nsQuota, dsQuota, blockSize);
+    }
 
     private void loadDatanodes(DataInputStream in) throws IOException {
       if (imgVersion > -3) // pre datanode image version
@@ -298,9 +339,9 @@ public abstract class FSImageFormat {
       }
     }
 
-    private void loadFilesUnderConstruction(DataInputStream in, 
-        FSNamesystem fs) throws IOException {
-      FSDirectory fsDir = fs.dir;
+    private void loadFilesUnderConstruction(DataInputStream in)
+    throws IOException {
+      FSDirectory fsDir = namesystem.dir;
       if (imgVersion > -13) // pre lease image version
         return;
       int size = in.readInt();
@@ -322,18 +363,17 @@ public abstract class FSImageFormat {
         }
         INodeFile oldnode = (INodeFile) old;
         fsDir.replaceNode(path, oldnode, cons);
-        fs.leaseManager.addLease(cons.getClientName(), path); 
+        namesystem.leaseManager.addLease(cons.getClientName(), path); 
       }
     }
 
-    private void loadSecretManagerState(DataInputStream in, 
-        FSNamesystem fs) throws IOException {
+    private void loadSecretManagerState(DataInputStream in) throws IOException {
       if (imgVersion > -23) {
         //SecretManagerState is not available.
         //This must not happen if security is turned on.
         return; 
       }
-      fs.loadSecretManagerState(in);
+      namesystem.loadSecretManagerState(in);
     }
 
 
@@ -384,42 +424,42 @@ public abstract class FSImageFormat {
    * The write() function should be called once, after which the getter
    * functions may be used to retrieve information about the file that was written.
    */
-  static class Writer {
+  static class Saver {
     /** Set to true once an image has been written */
-    private boolean written = false;
+    private boolean saved = false;
     
     /** The MD5 checksum of the file that was written */
-    private MD5Hash writtenDigest;
+    private MD5Hash savedDigest;
 
     static private final byte[] PATH_SEPARATOR = DFSUtil.string2Bytes(Path.SEPARATOR);
 
-    /** @throws IllegalStateException if the instance has not yet written an image */
-    private void checkWritten() {
-      if (!written) {
-        throw new IllegalStateException("FSImageWriter has not written an image");
+    /** @throws IllegalStateException if the instance has not yet saved an image */
+    private void checkSaved() {
+      if (!saved) {
+        throw new IllegalStateException("FSImageSaver has not saved an image");
       }
     }
     
-    /** @throws IllegalStateException if the instance has already written an image */
-    private void checkNotWritten() {
-      if (written) {
-        throw new IllegalStateException("FSImageWriter has already written an image");
+    /** @throws IllegalStateException if the instance has already saved an image */
+    private void checkNotSaved() {
+      if (saved) {
+        throw new IllegalStateException("FSImageSaver has already saved an image");
       }
     }
 
     /**
      * Return the MD5 checksum of the image file that was saved.
      */
-    MD5Hash getWrittenDigest() {
-      checkWritten();
-      return writtenDigest;
+    MD5Hash getSavedDigest() {
+      checkSaved();
+      return savedDigest;
     }
 
-    void write(File newFile,
-               FSNamesystem sourceNamesystem,
-               FSImageCompression compression)
+    void save(File newFile,
+              FSNamesystem sourceNamesystem,
+              FSImageCompression compression)
       throws IOException {
-      checkNotWritten();
+      checkNotSaved();
 
       FSDirectory fsDir = sourceNamesystem.dir;
       long startTime = now();
@@ -458,9 +498,9 @@ public abstract class FSImageFormat {
         out.close();
       }
 
-      written = true;
+      saved = true;
       // set md5 of the saved image
-      writtenDigest = new MD5Hash(digester.digest());
+      savedDigest = new MD5Hash(digester.digest());
 
       LOG.info("Image file of size " + newFile.length() + " saved in " 
           + (now() - startTime)/1000 + " seconds.");

+ 8 - 1
src/java/org/apache/hadoop/hdfs/server/namenode/FSImageSerialization.java

@@ -40,11 +40,18 @@ import org.apache.hadoop.io.Writable;
 /**
  * Static utility functions for serializing various pieces of data in the correct
  * format for the FSImage file.
+ *
+ * Some members are currently public for the benefit of the Offline Image Viewer
+ * which is located outside of this package. These members should be made
+ * package-protected when the OIV is refactored.
  */
 @InterfaceAudience.Private
 @InterfaceStability.Evolving
-public abstract class FSImageSerialization {
+public class FSImageSerialization {
 
+  // Static-only class
+  private FSImageSerialization() {}
+  
   /**
    * In order to reduce allocation, we reuse some static objects. However, the methods
    * in this class should be thread-safe since image-saving is multithreaded, so 

+ 8 - 1
src/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java

@@ -4888,10 +4888,14 @@ public class FSNamesystem implements FSConstants, FSNamesystemMBean, FSClusterSt
    * @throws IOException
    */
   Collection<CorruptFileBlockInfo> listCorruptFileBlocks(String path,
-      String startBlockAfter) throws AccessControlException, IOException {
+      String startBlockAfter) throws IOException {
 
     readLock();
     try {
+    if (isInSafeMode()) {
+      throw new IOException("Cannot run listCorruptFileBlocks because " +
+                            "replication queues have not been initialized.");
+    }
     checkSuperuserPrivilege();
     long startBlockId = 0;
     // print a limited # of corrupt files per call
@@ -5186,6 +5190,9 @@ public class FSNamesystem implements FSConstants, FSNamesystemMBean, FSClusterSt
     try {
       ObjectName mxbeanName = new ObjectName("HadoopInfo:type=NameNodeInfo");
       mbs.registerMBean(this, mxbeanName);
+    } catch ( javax.management.InstanceAlreadyExistsException iaee ) {
+      // in unit tests, we may run and restart the NN within the same JVM
+      LOG.info("NameNode MXBean already registered");
     } catch ( javax.management.JMException e ) {
       LOG.warn("Failed to register NameNodeMXBean", e);
     }

+ 40 - 0
src/java/org/apache/hadoop/hdfs/server/namenode/INode.java

@@ -446,4 +446,44 @@ abstract class INode implements Comparable<byte[]>, FSInodeInfo {
     }
     return len1 - len2;
   }
+  
+  /**
+   * Create an INode; the inode's name is not set yet
+   * 
+   * @param permissions permissions
+   * @param blocks blocks if a file
+   * @param symlink symblic link if a symbolic link
+   * @param replication replication factor
+   * @param modificationTime modification time
+   * @param atime access time
+   * @param nsQuota namespace quota
+   * @param dsQuota disk quota
+   * @param preferredBlockSize block size
+   * @return an inode
+   */
+  static INode newINode(PermissionStatus permissions,
+                        BlockInfo[] blocks,
+                        String symlink,
+                        short replication,
+                        long modificationTime,
+                        long atime,
+                        long nsQuota,
+                        long dsQuota,
+                        long preferredBlockSize) {
+    if (blocks == null) {
+      if (nsQuota >= 0 || dsQuota >= 0) {
+        return new INodeDirectoryWithQuota(
+            permissions, modificationTime, nsQuota, dsQuota);
+      } 
+      // regular directory
+      return new INodeDirectory(permissions, modificationTime);
+    }
+    // check if symbolic link
+    if (symlink.length() != 0) {
+      return new INodeSymlink(symlink, modificationTime, atime, permissions);
+    } 
+    // file
+    return new INodeFile(permissions, blocks, replication,
+        modificationTime, atime, preferredBlockSize);
+  }
 }

+ 52 - 25
src/java/org/apache/hadoop/hdfs/server/namenode/INodeDirectory.java

@@ -320,7 +320,7 @@ class INodeDirectory extends INode {
   <T extends INode> T addNode(String path, T newNode, boolean inheritPermission
       ) throws FileNotFoundException, UnresolvedLinkException  {
     byte[][] pathComponents = getPathComponents(path);        
-    if(addToParent(pathComponents, newNode, null,
+    if(addToParent(pathComponents, newNode,
                     inheritPermission, true) == null)
       return null;
     return newNode;
@@ -335,35 +335,62 @@ class INodeDirectory extends INode {
    * @throws  FileNotFoundException if parent does not exist or 
    *          is not a directory.
    */
-  <T extends INode> INodeDirectory addToParent(
-                                      byte[][] pathComponents,
-                                      T newNode,
-                                      INodeDirectory parent,
-                                      boolean inheritPermission,
-                                      boolean propagateModTime
-                                    ) throws FileNotFoundException, 
-                                             UnresolvedLinkException {
-              
+  INodeDirectory addToParent( byte[] localname,
+                              INode newNode,
+                              INodeDirectory parent,
+                              boolean inheritPermission,
+                              boolean propagateModTime
+                              ) throws FileNotFoundException, 
+                                       UnresolvedLinkException {
+    // insert into the parent children list
+    newNode.name = localname;
+    if(parent.addChild(newNode, inheritPermission, propagateModTime) == null)
+      return null;
+    return parent;
+  }
+
+  INodeDirectory getParent(byte[][] pathComponents)
+  throws FileNotFoundException, UnresolvedLinkException {
     int pathLen = pathComponents.length;
     if (pathLen < 2)  // add root
       return null;
-    if(parent == null) {
-      // Gets the parent INode
-      INode[] inodes  = new INode[2];
-      getExistingPathINodes(pathComponents, inodes, false);
-      INode inode = inodes[0];
-      if (inode == null) {
-        throw new FileNotFoundException("Parent path does not exist: "+
-            DFSUtil.byteArray2String(pathComponents));
-      }
-      if (!inode.isDirectory()) {
-        throw new FileNotFoundException("Parent path is not a directory: "+
-            DFSUtil.byteArray2String(pathComponents));
-      }
-      parent = (INodeDirectory)inode;
+    // Gets the parent INode
+    INode[] inodes  = new INode[2];
+    getExistingPathINodes(pathComponents, inodes, false);
+    INode inode = inodes[0];
+    if (inode == null) {
+      throw new FileNotFoundException("Parent path does not exist: "+
+          DFSUtil.byteArray2String(pathComponents));
     }
-    // insert into the parent children list
+    if (!inode.isDirectory()) {
+      throw new FileNotFoundException("Parent path is not a directory: "+
+          DFSUtil.byteArray2String(pathComponents));
+    }
+    return (INodeDirectory)inode;
+  }
+  
+  /**
+   * Add new inode 
+   * Optimized version of addNode()
+   * 
+   * @return  parent INode if new inode is inserted
+   *          or null if it already exists.
+   * @throws  FileNotFoundException if parent does not exist or 
+   *          is not a directory.
+   */
+  INodeDirectory addToParent( byte[][] pathComponents,
+                              INode newNode,
+                              boolean inheritPermission,
+                              boolean propagateModTime
+                            ) throws FileNotFoundException, 
+                                     UnresolvedLinkException {
+              
+    int pathLen = pathComponents.length;
+    if (pathLen < 2)  // add root
+      return null;
     newNode.name = pathComponents[pathLen-1];
+    // insert into the parent children list
+    INodeDirectory parent = getParent(pathComponents);
     if(parent.addChild(newNode, inheritPermission, propagateModTime) == null)
       return null;
     return parent;

+ 1 - 1
src/java/org/apache/hadoop/hdfs/server/namenode/NameNode.java

@@ -43,7 +43,6 @@ import org.apache.hadoop.fs.Options;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.Trash;
 import org.apache.hadoop.fs.UnresolvedLinkException;
-import org.apache.hadoop.fs.CorruptFileBlocks;
 import org.apache.hadoop.fs.permission.FsPermission;
 import org.apache.hadoop.fs.permission.PermissionStatus;
 import org.apache.hadoop.hdfs.DFSConfigKeys;
@@ -53,6 +52,7 @@ import org.apache.hadoop.hdfs.HdfsConfiguration;
 import org.apache.hadoop.hdfs.protocol.Block;
 import org.apache.hadoop.hdfs.protocol.BlockListAsLongs;
 import org.apache.hadoop.hdfs.protocol.ClientProtocol;
+import org.apache.hadoop.hdfs.protocol.CorruptFileBlocks;
 import org.apache.hadoop.hdfs.protocol.DatanodeID;
 import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
 import org.apache.hadoop.hdfs.protocol.DirectoryListing;

+ 1 - 2
src/java/org/apache/hadoop/hdfs/server/namenode/NamenodeFsck.java

@@ -214,8 +214,7 @@ public class NamenodeFsck {
     }
   }
  
-  private void listCorruptFileBlocks() throws AccessControlException,
-      IOException {
+  private void listCorruptFileBlocks() throws IOException {
     Collection<FSNamesystem.CorruptFileBlockInfo> corruptFiles = namenode.
       getNamesystem().listCorruptFileBlocks(path, startBlockAfter);
     int numCorruptFiles = corruptFiles.size();

+ 2 - 2
src/test/findbugsExcludeFile.xml

@@ -231,8 +231,8 @@
       wrapped streams, too.
      -->
      <Match>
-       <Class name="org.apache.hadoop.hdfs.server.namenode.FSImageFormat$Writer" />
-       <Method name="write" />
+       <Class name="org.apache.hadoop.hdfs.server.namenode.FSImageFormat$Saver" />
+       <Method name="save" />
        <Bug pattern="OS_OPEN_STREAM" />
      </Match>
  </FindBugsFilter>

+ 58 - 57
src/test/hdfs/org/apache/hadoop/hdfs/TestLargeBlock.java

@@ -17,72 +17,68 @@
  */
 package org.apache.hadoop.hdfs;
 
-import java.io.BufferedReader;
-import java.io.File;
-import java.io.FileReader;
 import java.io.IOException;
-import java.net.InetSocketAddress;
-import java.util.Random;
 import java.util.Arrays;
 
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
 import org.apache.commons.logging.impl.Log4JLogger;
+import org.apache.log4j.Level;
+
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.BlockLocation;
 import org.apache.hadoop.fs.FSDataInputStream;
 import org.apache.hadoop.fs.FSDataOutputStream;
-import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hdfs.protocol.Block;
-import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
-import org.apache.hadoop.hdfs.protocol.FSConstants;
-import org.apache.hadoop.hdfs.protocol.LocatedBlock;
-import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
 import org.apache.hadoop.hdfs.server.datanode.DataNode;
-import org.apache.hadoop.hdfs.server.datanode.FSDataset;
 import org.apache.hadoop.hdfs.server.datanode.SimulatedFSDataset;
 import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
 import org.apache.hadoop.hdfs.server.namenode.LeaseManager;
-import org.apache.hadoop.io.IOUtils;
-import org.apache.log4j.Level;
 
 import org.junit.Test;
+import static org.junit.Assert.assertTrue;
 
 /**
  * This class tests that blocks can be larger than 2GB
  */
-public class TestLargeBlock extends junit.framework.TestCase {
-  static final String DIR = "/" + TestLargeBlock.class.getSimpleName() + "/";
-
+public class TestLargeBlock {
+/**
   {
-    // ((Log4JLogger)DataNode.LOG).getLogger().setLevel(Level.ALL);
-    // ((Log4JLogger)LeaseManager.LOG).getLogger().setLevel(Level.ALL);
-    // ((Log4JLogger)FSNamesystem.LOG).getLogger().setLevel(Level.ALL);
-    // ((Log4JLogger)DFSClient.LOG).getLogger().setLevel(Level.ALL);
+    ((Log4JLogger)DataNode.LOG).getLogger().setLevel(Level.ALL);
+    ((Log4JLogger)LeaseManager.LOG).getLogger().setLevel(Level.ALL);
+    ((Log4JLogger)FSNamesystem.LOG).getLogger().setLevel(Level.ALL);
+    ((Log4JLogger)DFSClient.LOG).getLogger().setLevel(Level.ALL);
+    ((Log4JLogger)TestLargeBlock.LOG).getLogger().setLevel(Level.ALL);
   }
+ */
+  private static final Log LOG = LogFactory.getLog(TestLargeBlock.class);
 
-
-  static final boolean verifyData = true; // should we verify the data read back from the file? (slow)
+  // should we verify the data read back from the file? (slow)
+  static final boolean verifyData = true;
   static final byte[] pattern = { 'D', 'E', 'A', 'D', 'B', 'E', 'E', 'F'};
   static final boolean simulatedStorage = false;
 
   // creates a file 
-  static FSDataOutputStream createFile(FileSystem fileSys, Path name, int repl, final long blockSize)
+  static FSDataOutputStream createFile(FileSystem fileSys, Path name, int repl,
+                                       final long blockSize)
     throws IOException {
     FSDataOutputStream stm = fileSys.create(name, true,
-                                            fileSys.getConf().getInt("io.file.buffer.size", 4096),
-                                            (short)repl, blockSize);
-    System.out.println("createFile: Created " + name + " with " + repl + " replica.");
+        fileSys.getConf().getInt("io.file.buffer.size", 4096),
+        (short)repl, blockSize);
+    LOG.info("createFile: Created " + name + " with " + repl + " replica.");
     return stm;
   }
 
-
   /**
    * Writes pattern to file
+   * @param stm FSDataOutputStream to write the file
+   * @param fileSize size of the file to be written
+   * @throws IOException in case of errors
    */
-  static void writeFile(FSDataOutputStream stm, final long fileSize) throws IOException {
-    final int writeSize = pattern.length * 8 * 1024 * 1024; // write in chunks of 64 MB
-    final int writeCount = (int) ((fileSize / ((long) writeSize)) + ((fileSize % ((long) writeSize) == 0L) ? 0L : 1L));
+  static void writeFile(FSDataOutputStream stm, final long fileSize)
+      throws IOException {
+    // write in chunks of 64 MB
+    final int writeSize = pattern.length * 8 * 1024 * 1024;
 
     if (writeSize > Integer.MAX_VALUE) {
       throw new IOException("A single write is too large " + writeSize);
@@ -96,24 +92,26 @@ public class TestLargeBlock extends junit.framework.TestCase {
       b[j] = pattern[j % pattern.length];
     }
 
-    int i = 0;
-
     while (bytesToWrite > 0) {
-      int thiswrite = (int) Math.min(writeSize, bytesToWrite); // how many bytes we are writing in this iteration
+      // how many bytes we are writing in this iteration
+      int thiswrite = (int) Math.min(writeSize, bytesToWrite);
 
       stm.write(b, 0, thiswrite);
-      // System.out.println("Wrote[" + i + "/" + writeCount + "] " + thiswrite + " bytes.");
       bytesToWrite -= thiswrite;
-      i++;
     }
   }
 
   /**
    * Reads from file and makes sure that it matches the pattern
+   * @param fs a reference to FileSystem
+   * @param name Path of a file
+   * @param fileSize size of the file
+   * @throws IOException in case of errors
    */
-  static void checkFullFile(FileSystem fs, Path name, final long fileSize) throws IOException {
-    final int readSize = pattern.length * 16 * 1024 * 1024; // read in chunks of 128 MB
-    final int readCount = (int) ((fileSize / ((long) readSize)) + ((fileSize % ((long) readSize) == 0L) ? 0L : 1L));
+  static void checkFullFile(FileSystem fs, Path name, final long fileSize)
+      throws IOException {
+    // read in chunks of 128 MB
+    final int readSize = pattern.length * 16 * 1024 * 1024;
 
     if (readSize > Integer.MAX_VALUE) {
       throw new IOException("A single read is too large " + readSize);
@@ -131,39 +129,39 @@ public class TestLargeBlock extends junit.framework.TestCase {
       }
     }
 
-
     FSDataInputStream stm = fs.open(name);
 
-    int i = 0;
-
     while (bytesToRead > 0) {
-      int thisread = (int) Math.min(readSize, bytesToRead); // how many bytes we are reading in this iteration
+      // how many bytes we are reading in this iteration
+      int thisread = (int) Math.min(readSize, bytesToRead);
 
       stm.readFully(b, 0, thisread); 
       
       if (verifyData) {
         // verify data read
-        
         if (thisread == readSize) {
-          assertTrue("file corrupted at or after byte " + (fileSize - bytesToRead), Arrays.equals(b, compb));
+          assertTrue("file is corrupted at or after byte " +
+              (fileSize - bytesToRead), Arrays.equals(b, compb));
         } else {
           // b was only partially filled by last read
           for (int k = 0; k < thisread; k++) {
-            assertTrue("file corrupted at or after byte " + (fileSize - bytesToRead), b[k] == compb[k]);
+            assertTrue("file is corrupted at or after byte " +
+                (fileSize - bytesToRead), b[k] == compb[k]);
           }
         }
       }
-
-      // System.out.println("Read[" + i + "/" + readCount + "] " + thisread + " bytes.");
-
+      LOG.debug("Before update: to read: " + bytesToRead +
+          "; read already: "+ thisread);
       bytesToRead -= thisread;
-      i++;
+      LOG.debug("After  update: to read: " + bytesToRead +
+          "; read already: " + thisread);
     }
     stm.close();
   }
  
   /**
    * Test for block size of 2GB + 512B
+   * @throws IOException in case of errors
    */
   @Test
   public void testLargeBlockSize() throws IOException {
@@ -173,6 +171,8 @@ public class TestLargeBlock extends junit.framework.TestCase {
   
   /**
    * Test that we can write to and read from large blocks
+   * @param blockSize size of the block
+   * @throws IOException in case of errors
    */
   public void runTest(final long blockSize) throws IOException {
 
@@ -188,11 +188,12 @@ public class TestLargeBlock extends junit.framework.TestCase {
     try {
 
       // create a new file in test data directory
-      Path file1 = new Path(System.getProperty("test.build.data") + "/" + Long.toString(blockSize) + ".dat");
+      Path file1 = new Path(System.getProperty("test.build.data") + "/" +
+          Long.toString(blockSize) + ".dat");
       FSDataOutputStream stm = createFile(fs, file1, 1, blockSize);
-      System.out.println("File " + file1 + " created with file size " +
-                         fileSize +
-                         " blocksize " + blockSize);
+      LOG.info("File " + file1 + " created with file size " +
+          fileSize +
+          " blocksize " + blockSize);
 
       // verify that file exists in FS namespace
       assertTrue(file1 + " should be a file", 
@@ -200,11 +201,11 @@ public class TestLargeBlock extends junit.framework.TestCase {
 
       // write to file
       writeFile(stm, fileSize);
-      System.out.println("File " + file1 + " written to.");
+      LOG.info("File " + file1 + " written to.");
 
       // close file
       stm.close();
-      System.out.println("File " + file1 + " closed.");
+      LOG.info("File " + file1 + " closed.");
 
       // Make sure a client can read it
       checkFullFile(fs, file1, fileSize);

+ 79 - 0
src/test/hdfs/org/apache/hadoop/hdfs/protocol/TestCorruptFileBlocks.java

@@ -0,0 +1,79 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hdfs.protocol;
+
+import java.io.ByteArrayInputStream;
+import java.io.DataInputStream;
+import java.io.IOException;
+
+import static org.junit.Assert.assertTrue;
+import org.junit.Test;
+
+import org.apache.hadoop.io.DataOutputBuffer;
+
+public class TestCorruptFileBlocks {
+
+  /**
+   * Serialize the cfb given, deserialize and return the result.
+   */
+  static CorruptFileBlocks serializeAndDeserialize(CorruptFileBlocks cfb) 
+    throws IOException {
+    DataOutputBuffer buf = new DataOutputBuffer();
+    cfb.write(buf);
+
+    byte[] data = buf.getData();
+    DataInputStream input = new DataInputStream(new ByteArrayInputStream(data));
+
+    CorruptFileBlocks result = new CorruptFileBlocks();
+    result.readFields(input);
+
+    return result;
+  }
+
+  /**
+   * Check whether cfb is unchanged after serialization and deserialization.
+   */
+  static boolean checkSerialize(CorruptFileBlocks cfb)
+    throws IOException {
+    return cfb.equals(serializeAndDeserialize(cfb));
+  }
+
+  /**
+   * Test serialization and deserializaton of CorruptFileBlocks.
+   */
+  @Test
+  public void testSerialization() throws IOException {
+    {
+      CorruptFileBlocks cfb = new CorruptFileBlocks();
+      assertTrue("cannot serialize empty CFB", checkSerialize(cfb));
+    }
+
+    {
+      String[] files = new String[0];
+      CorruptFileBlocks cfb = new CorruptFileBlocks(files, "");
+      assertTrue("cannot serialize CFB with empty cookie", checkSerialize(cfb));
+    }
+
+    {
+      String[] files = { "a", "bb", "ccc" };
+      CorruptFileBlocks cfb = new CorruptFileBlocks(files, "test");
+      assertTrue("cannot serialize CFB", checkSerialize(cfb));
+    }
+  }
+}

+ 1 - 1
src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/TestFsck.java

@@ -39,7 +39,6 @@ import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.fs.CorruptFileBlocks;
 import org.apache.hadoop.fs.permission.FsPermission;
 import org.apache.hadoop.hdfs.DFSClient;
 import org.apache.hadoop.hdfs.DFSConfigKeys;
@@ -47,6 +46,7 @@ import org.apache.hadoop.hdfs.DFSTestUtil;
 import org.apache.hadoop.hdfs.HdfsConfiguration;
 import org.apache.hadoop.hdfs.MiniDFSCluster;
 import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
+import org.apache.hadoop.hdfs.protocol.CorruptFileBlocks;
 import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
 import org.apache.hadoop.hdfs.tools.DFSck;
 import org.apache.hadoop.io.IOUtils;

+ 43 - 12
src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/TestListCorruptFileBlocks.java

@@ -25,13 +25,16 @@ import java.nio.channels.FileChannel;
 import java.util.Collection;
 import java.util.Random;
 
-import junit.framework.TestCase;
+import org.junit.Test;
+import static org.junit.Assert.assertTrue;
 
 import org.apache.commons.logging.Log;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.CorruptFileBlocks;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.RemoteIterator;
 import org.apache.hadoop.hdfs.BlockMissingException;
+import org.apache.hadoop.hdfs.CorruptFileBlockIterator;
 import org.apache.hadoop.hdfs.DFSTestUtil;
 import org.apache.hadoop.hdfs.HdfsConfiguration;
 import org.apache.hadoop.hdfs.MiniDFSCluster;
@@ -45,10 +48,11 @@ import org.apache.hadoop.hdfs.DistributedFileSystem;
  * with a block # from a previous call and validate that the subsequent
  * blocks/files are also returned.
  */
-public class TestListCorruptFileBlocks extends TestCase {
+public class TestListCorruptFileBlocks {
   static Log LOG = NameNode.stateChangeLog;
 
   /** check if nn.getCorruptFiles() returns a file that has corrupted blocks */
+  @Test
   public void testListCorruptFilesCorruptedBlock() throws Exception {
     MiniDFSCluster cluster = null;
     Random random = new Random();
@@ -120,6 +124,7 @@ public class TestListCorruptFileBlocks extends TestCase {
   }
   
   // deliberately remove blocks from a file and validate the list-corrupt-file-blocks API
+  @Test
   public void testlistCorruptFileBlocks() throws Exception {
     Configuration conf = new Configuration();
     conf.setLong("dfs.blockreport.intervalMsec", 1000);
@@ -214,9 +219,19 @@ public class TestListCorruptFileBlocks extends TestCase {
     }
   }
 
+  private int countPaths(RemoteIterator<Path> iter) throws IOException {
+    int i = 0;
+    while (iter.hasNext()) {
+      LOG.info("PATH: " + iter.next().toUri().getPath());
+      i++;
+    }
+    return i;
+  }
+
   /**
    * test listCorruptFileBlocks in DistributedFileSystem
-   */ 
+   */
+  @Test
   public void testlistCorruptFileBlocksDFS() throws Exception {
     Configuration conf = new Configuration();
     conf.setLong("dfs.blockreport.intervalMsec", 1000);
@@ -234,9 +249,9 @@ public class TestListCorruptFileBlocks extends TestCase {
       util.createFiles(fs, "/corruptData");
 
       final NameNode namenode = cluster.getNameNode();
-      CorruptFileBlocks corruptFileBlocks = 
-        dfs.listCorruptFileBlocks("/corruptData", null);
-      int numCorrupt = corruptFileBlocks.getFiles().length;
+      RemoteIterator<Path> corruptFileBlocks = 
+        dfs.listCorruptFileBlocks(new Path("/corruptData"));
+      int numCorrupt = countPaths(corruptFileBlocks);
       assertTrue(numCorrupt == 0);
       // delete the blocks
       File baseDir = new File(System.getProperty("test.build.data",
@@ -260,12 +275,12 @@ public class TestListCorruptFileBlocks extends TestCase {
       }
 
       int count = 0;
-      corruptFileBlocks = dfs.listCorruptFileBlocks("/corruptData", null);
-      numCorrupt = corruptFileBlocks.getFiles().length;
+      corruptFileBlocks = dfs.listCorruptFileBlocks(new Path("/corruptData"));
+      numCorrupt = countPaths(corruptFileBlocks);
       while (numCorrupt < 3) {
         Thread.sleep(1000);
-        corruptFileBlocks = dfs.listCorruptFileBlocks("/corruptData", null);
-        numCorrupt = corruptFileBlocks.getFiles().length;
+        corruptFileBlocks = dfs.listCorruptFileBlocks(new Path("/corruptData"));
+        numCorrupt = countPaths(corruptFileBlocks);
         count++;
         if (count > 30)
           break;
@@ -283,7 +298,12 @@ public class TestListCorruptFileBlocks extends TestCase {
     }
   }
     
-  /** check if NN.listCorruptFiles() returns the right limit */
+  /**
+   * Test if NN.listCorruptFiles() returns the right number of results.
+   * Also, test that DFS.listCorruptFileBlocks can make multiple successive
+   * calls.
+   */
+  @Test
   public void testMaxCorruptFiles() throws Exception {
     MiniDFSCluster cluster = null;
     try {
@@ -343,6 +363,17 @@ public class TestListCorruptFileBlocks extends TestCase {
       assertTrue("Namenode has " + badFiles.size() + " bad files. Expecting " + 
           maxCorruptFileBlocks + ".",
           badFiles.size() == maxCorruptFileBlocks);
+
+      CorruptFileBlockIterator iter = (CorruptFileBlockIterator)
+        fs.listCorruptFileBlocks(new Path("/srcdat2"));
+      int corruptPaths = countPaths(iter);
+      assertTrue("Expected more than " + maxCorruptFileBlocks +
+                 " corrupt file blocks but got " + corruptPaths,
+                 corruptPaths > maxCorruptFileBlocks);
+      assertTrue("Iterator should have made more than 1 call but made " +
+                 iter.getCallsMade(),
+                 iter.getCallsMade() > 1);
+
       util.cleanup(fs, "/srcdat2");
     } finally {
       if (cluster != null) { cluster.shutdown(); }

+ 3 - 9
src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/TestSaveNamespace.java

@@ -60,13 +60,8 @@ public class TestSaveNamespace {
 
   private static class FaultySaveImage implements Answer<Void> {
     int count = 0;
-    FSImage origImage;
 
-    public FaultySaveImage(FSImage origImage) {
-      this.origImage = origImage;
-    }
-
-    public Void answer(InvocationOnMock invocation) throws Exception {
+    public Void answer(InvocationOnMock invocation) throws Throwable {
       Object[] args = invocation.getArguments();
       File f = (File)args[0];
 
@@ -75,8 +70,7 @@ public class TestSaveNamespace {
         throw new RuntimeException("Injected fault: saveFSImage second time");
       }
       LOG.info("Not injecting fault for file: " + f);
-      origImage.saveFSImage(f);
-      return null;
+      return (Void)invocation.callRealMethod();
     }
   }
 
@@ -104,7 +98,7 @@ public class TestSaveNamespace {
     switch(fault) {
     case SAVE_FSIMAGE:
       // The spy throws a RuntimeException when writing to the second directory
-      doAnswer(new FaultySaveImage(spyImage)).
+      doAnswer(new FaultySaveImage()).
         when(spyImage).saveFSImage((File)anyObject());
       break;
     case MOVE_CURRENT:

+ 19 - 11
src/test/unit/org/apache/hadoop/hdfs/server/datanode/TestBlockRecovery.java

@@ -138,7 +138,8 @@ public class TestBlockRecovery {
   private void testSyncReplicas(ReplicaRecoveryInfo replica1, 
       ReplicaRecoveryInfo replica2,
       InterDatanodeProtocol dn1,
-      InterDatanodeProtocol dn2) throws IOException {
+      InterDatanodeProtocol dn2,
+      long expectLen) throws IOException {
     
     DatanodeInfo[] locs = new DatanodeInfo[]{
         mock(DatanodeInfo.class), mock(DatanodeInfo.class)};
@@ -151,6 +152,13 @@ public class TestBlockRecovery {
         new DatanodeID("aa", "bb", 11, 22), dn2, replica2);
     syncList.add(record1);
     syncList.add(record2);
+    
+    when(dn1.updateReplicaUnderRecovery((Block)anyObject(), anyLong(), 
+        anyLong())).thenReturn(new Block(block.getBlockId(), 
+            expectLen, block.getGenerationStamp()));
+    when(dn2.updateReplicaUnderRecovery((Block)anyObject(), anyLong(), 
+        anyLong())).thenReturn(new Block(block.getBlockId(), 
+            expectLen, block.getGenerationStamp()));
     dn.syncBlock(rBlock, syncList);
   }
   
@@ -172,7 +180,7 @@ public class TestBlockRecovery {
     InterDatanodeProtocol dn1 = mock(InterDatanodeProtocol.class);
     InterDatanodeProtocol dn2 = mock(InterDatanodeProtocol.class);
 
-    testSyncReplicas(replica1, replica2, dn1, dn2);
+    testSyncReplicas(replica1, replica2, dn1, dn2, REPLICA_LEN1);
     verify(dn1).updateReplicaUnderRecovery(block, RECOVERY_ID, REPLICA_LEN1);
     verify(dn2).updateReplicaUnderRecovery(block, RECOVERY_ID, REPLICA_LEN1);    
 
@@ -183,7 +191,7 @@ public class TestBlockRecovery {
         REPLICA_LEN2, GEN_STAMP-2, ReplicaState.FINALIZED);
 
     try {
-      testSyncReplicas(replica1, replica2, dn1, dn2);
+      testSyncReplicas(replica1, replica2, dn1, dn2, REPLICA_LEN1);
       Assert.fail("Two finalized replicas should not have different lengthes!");
     } catch (IOException e) {
       Assert.assertTrue(e.getMessage().startsWith(
@@ -211,7 +219,7 @@ public class TestBlockRecovery {
     InterDatanodeProtocol dn1 = mock(InterDatanodeProtocol.class);
     InterDatanodeProtocol dn2 = mock(InterDatanodeProtocol.class);
 
-    testSyncReplicas(replica1, replica2, dn1, dn2);
+    testSyncReplicas(replica1, replica2, dn1, dn2, REPLICA_LEN1);
     verify(dn1).updateReplicaUnderRecovery(block, RECOVERY_ID, REPLICA_LEN1);
     verify(dn2).updateReplicaUnderRecovery(block, RECOVERY_ID, REPLICA_LEN1);
     
@@ -224,7 +232,7 @@ public class TestBlockRecovery {
     dn1 = mock(InterDatanodeProtocol.class);
     dn2 = mock(InterDatanodeProtocol.class);
 
-    testSyncReplicas(replica1, replica2, dn1, dn2);
+    testSyncReplicas(replica1, replica2, dn1, dn2, REPLICA_LEN1);
     verify(dn1).updateReplicaUnderRecovery(block, RECOVERY_ID, REPLICA_LEN1);
     verify(dn2, never()).updateReplicaUnderRecovery(
         block, RECOVERY_ID, REPLICA_LEN1);
@@ -250,7 +258,7 @@ public class TestBlockRecovery {
     InterDatanodeProtocol dn1 = mock(InterDatanodeProtocol.class);
     InterDatanodeProtocol dn2 = mock(InterDatanodeProtocol.class);
 
-    testSyncReplicas(replica1, replica2, dn1, dn2);
+    testSyncReplicas(replica1, replica2, dn1, dn2, REPLICA_LEN1);
     verify(dn1).updateReplicaUnderRecovery(block, RECOVERY_ID, REPLICA_LEN1);
     verify(dn2, never()).updateReplicaUnderRecovery(
         block, RECOVERY_ID, REPLICA_LEN1);
@@ -264,7 +272,7 @@ public class TestBlockRecovery {
     dn1 = mock(InterDatanodeProtocol.class);
     dn2 = mock(InterDatanodeProtocol.class);
 
-    testSyncReplicas(replica1, replica2, dn1, dn2);
+    testSyncReplicas(replica1, replica2, dn1, dn2, REPLICA_LEN1);
     verify(dn1).updateReplicaUnderRecovery(block, RECOVERY_ID, REPLICA_LEN1);
     verify(dn2, never()).updateReplicaUnderRecovery(
         block, RECOVERY_ID, REPLICA_LEN1);
@@ -288,8 +296,8 @@ public class TestBlockRecovery {
     InterDatanodeProtocol dn1 = mock(InterDatanodeProtocol.class);
     InterDatanodeProtocol dn2 = mock(InterDatanodeProtocol.class);
 
-    testSyncReplicas(replica1, replica2, dn1, dn2);
     long minLen = Math.min(REPLICA_LEN1, REPLICA_LEN2);
+    testSyncReplicas(replica1, replica2, dn1, dn2, minLen);
     verify(dn1).updateReplicaUnderRecovery(block, RECOVERY_ID, minLen);
     verify(dn2).updateReplicaUnderRecovery(block, RECOVERY_ID, minLen);    
   }
@@ -312,7 +320,7 @@ public class TestBlockRecovery {
     InterDatanodeProtocol dn1 = mock(InterDatanodeProtocol.class);
     InterDatanodeProtocol dn2 = mock(InterDatanodeProtocol.class);
 
-    testSyncReplicas(replica1, replica2, dn1, dn2);
+    testSyncReplicas(replica1, replica2, dn1, dn2, REPLICA_LEN1);
     verify(dn1).updateReplicaUnderRecovery(block, RECOVERY_ID, REPLICA_LEN1);
     verify(dn2, never()).updateReplicaUnderRecovery(
         block, RECOVERY_ID, REPLICA_LEN1);    
@@ -336,9 +344,9 @@ public class TestBlockRecovery {
     InterDatanodeProtocol dn1 = mock(InterDatanodeProtocol.class);
     InterDatanodeProtocol dn2 = mock(InterDatanodeProtocol.class);
 
-    testSyncReplicas(replica1, replica2, dn1, dn2);
-    
     long minLen = Math.min(REPLICA_LEN1, REPLICA_LEN2);
+    testSyncReplicas(replica1, replica2, dn1, dn2, minLen);
+    
     verify(dn1).updateReplicaUnderRecovery(block, RECOVERY_ID, minLen);
     verify(dn2).updateReplicaUnderRecovery(block, RECOVERY_ID, minLen);    
   }