فهرست منبع

Merge changes r1038221:r1038859 from trunk to federation. Code does not compile until the change till HDFS-1533 is merged.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/hdfs/branches/HDFS-1052@1078908 13f79535-47bb-0310-9956-ffa450edef68
Suresh Srinivas 14 سال پیش
والد
کامیت
659b243958

+ 10 - 2
CHANGES.txt

@@ -211,6 +211,9 @@ Trunk (unreleased changes)
     HDFS-1700. Federation: fsck needs to work with federation changes.
     (Matt Foley via suresh)
 
+    HDFS-1482. Add listCorruptFileBlocks to DistributedFileSystem.
+    (Patrick Kling via hairong)
+
   IMPROVEMENTS
 
     HDFS-1510. Added test-patch.properties required by test-patch.sh (nigel)
@@ -227,8 +230,13 @@ Trunk (unreleased changes)
     HDFS-1588. Remove hardcoded strings for configuration keys, "dfs.hosts"
     and "dfs.hosts.exlude". (Erik Steffl via suresh)
 
+    HDFS-1481. NameNode should validate fsimage before rolling. (hairong)
+
   OPTIMIZATIONS
 
+    HDFS-1458. Improve checkpoint performance by avoiding unnecessary image
+    downloads and loading. (hairong)
+
   BUG FIXES
 
     HDFS-1449. Fix test failures - ExtendedBlock must return 
@@ -246,6 +254,8 @@ Trunk (unreleased changes)
 
     HDFS-1684. Balancer cannot start with with multiple namenodes.  (szetszwo)
 
+    HDFS-1516. mvn-install is broken after 0.22 branch creation. (cos)
+
 Release 0.22.0 - Unreleased
 
   NEW FEATURES
@@ -464,8 +474,6 @@ Release 0.22.0 - Unreleased
 
     HDFS-1513. Fix a number of warnings. (eli)
 
-    HDFS-1481. NameNode should validate fsimage before rolling. (hairong)
-
   OPTIMIZATIONS
 
     HDFS-1140. Speedup INode.getPathComponents. (Dmytro Molkov via shv)

+ 1 - 2
build.xml

@@ -28,8 +28,7 @@
  
   <property name="Name" value="Hadoop-Hdfs"/>
   <property name="name" value="hadoop-hdfs"/>
-  <!-- Need to change aop.xml project.version prop. synchronously
-   -->
+  <!-- ATTN: Need to change aop.xml's project.version prop. synchronously -->
   <property name="version" value="0.23.0-SNAPSHOT"/>
   <property name="final.name" value="${name}-${version}"/>
   <property name="test.hdfs.final.name" value="${name}-test-${version}"/>

+ 2 - 2
ivy/libraries.properties

@@ -34,8 +34,8 @@ commons-net.version=1.4.1
 core.version=3.1.1
 coreplugin.version=1.3.2
 
-hadoop-common.version=0.22.0-SNAPSHOT
-hadoop-hdfs.version=0.22.0-SNAPSHOT
+hadoop-common.version=0.23.0-SNAPSHOT
+hadoop-hdfs.version=0.23.0-SNAPSHOT
 
 hsqldb.version=1.8.0.10
 

+ 10 - 0
src/java/org/apache/hadoop/fs/Hdfs.java

@@ -303,6 +303,16 @@ public class Hdfs extends AbstractFileSystem {
     return listing.toArray(new FileStatus[listing.size()]);
   }
 
+  /**
+   * {@inheritDoc}
+   */
+  @Override
+  public CorruptFileBlocks listCorruptFileBlocks(String path,
+                                                 String cookie)
+    throws IOException {
+    return dfs.listCorruptFileBlocks(path, cookie);
+  }
+
   @Override
   public void mkdir(Path dir, FsPermission permission, boolean createParent)
     throws IOException, UnresolvedLinkException {

+ 11 - 0
src/java/org/apache/hadoop/hdfs/DFSClient.java

@@ -61,6 +61,7 @@ import org.apache.hadoop.fs.Options;
 import org.apache.hadoop.fs.ParentNotDirectoryException;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.UnresolvedLinkException;
+import org.apache.hadoop.fs.CorruptFileBlocks;
 import org.apache.hadoop.fs.permission.FsPermission;
 import org.apache.hadoop.hdfs.protocol.AlreadyBeingCreatedException;
 import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
@@ -1115,6 +1116,16 @@ public class DFSClient implements FSConstants, java.io.Closeable {
     return namenode.getStats()[ClientProtocol.GET_STATS_CORRUPT_BLOCKS_IDX];
   }
   
+  /**
+   * @return a list in which each entry describes a corrupt file/block
+   * @throws IOException
+   */
+  public CorruptFileBlocks listCorruptFileBlocks(String path,
+                                                 String cookie)
+    throws IOException {
+    return namenode.listCorruptFileBlocks(path, cookie);
+  }
+
   public DatanodeInfo[] datanodeReport(DatanodeReportType type)
   throws IOException {
     return namenode.getDatanodeReport(type);

+ 11 - 0
src/java/org/apache/hadoop/hdfs/DistributedFileSystem.java

@@ -43,6 +43,7 @@ import org.apache.hadoop.fs.Options;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.PathFilter;
 import org.apache.hadoop.fs.RemoteIterator;
+import org.apache.hadoop.fs.CorruptFileBlocks;
 import org.apache.hadoop.fs.permission.FsPermission;
 import org.apache.hadoop.hdfs.DFSClient.DFSDataInputStream;
 import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
@@ -601,6 +602,16 @@ public class DistributedFileSystem extends FileSystem {
     return dfs.getCorruptBlocksCount();
   }
 
+  /**
+   * {@inheritDoc}
+   */
+  @Override
+  public CorruptFileBlocks listCorruptFileBlocks(String path,
+                                                 String cookie)
+    throws IOException {
+    return dfs.listCorruptFileBlocks(path, cookie);
+  }
+
   /** Return statistics for each datanode. */
   public DatanodeInfo[] getDataNodeStats() throws IOException {
     return dfs.datanodeReport(DatanodeReportType.ALL);

+ 17 - 2
src/java/org/apache/hadoop/hdfs/protocol/ClientProtocol.java

@@ -32,6 +32,7 @@ import org.apache.hadoop.fs.FileAlreadyExistsException;
 import org.apache.hadoop.fs.ParentNotDirectoryException;
 import org.apache.hadoop.fs.InvalidPathException;
 import org.apache.hadoop.fs.UnresolvedLinkException;
+import org.apache.hadoop.fs.CorruptFileBlocks;
 import org.apache.hadoop.fs.Options.Rename;
 import org.apache.hadoop.fs.permission.FsPermission;
 import org.apache.hadoop.hdfs.DFSConfigKeys;
@@ -67,9 +68,9 @@ public interface ClientProtocol extends VersionedProtocol {
    * Compared to the previous version the following changes have been introduced:
    * (Only the latest change is reflected.
    * The log of historical changes can be retrieved from the svn).
-   * 65: Add block pool ID to Block
+   * 66: Add block pool ID to Block
    */
-  public static final long versionID = 65L;
+  public static final long versionID = 66L;
   
   ///////////////////////////////////////
   // File contents
@@ -659,6 +660,20 @@ public interface ClientProtocol extends VersionedProtocol {
   public UpgradeStatusReport distributedUpgradeProgress(UpgradeAction action) 
       throws IOException;
 
+  /**
+   * @return CorruptFileBlocks, containing a list of corrupt files (with
+   *         duplicates if there is more than one corrupt block in a file)
+   *         and a cookie
+   * @throws IOException
+   *
+   * Each call returns a subset of the corrupt files in the system. To obtain
+   * all corrupt files, call this method repeatedly and each time pass in the
+   * cookie returned from the previous call.
+   */
+  public CorruptFileBlocks
+    listCorruptFileBlocks(String path, String cookie)
+    throws IOException;
+  
   /**
    * Dumps namenode data structures into specified file. If the file
    * already exists, then append.

+ 17 - 12
src/java/org/apache/hadoop/hdfs/server/namenode/NameNode.java

@@ -43,6 +43,7 @@ import org.apache.hadoop.fs.Options;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.Trash;
 import org.apache.hadoop.fs.UnresolvedLinkException;
+import org.apache.hadoop.fs.CorruptFileBlocks;
 import org.apache.hadoop.fs.permission.FsPermission;
 import org.apache.hadoop.fs.permission.PermissionStatus;
 import org.apache.hadoop.hdfs.DFSConfigKeys;
@@ -1138,19 +1139,23 @@ public class NameNode implements NamenodeProtocols, FSConstants {
   }
 
   /**
-   * 
-   * @param path
-   *          Sub-tree used in querying corrupt files
-   * @param startBlockAfter
-   *          Paging support---pass in the last block returned from the previous
-   *          call and some # of corrupt blocks after that point are returned
-   * @return a list in which each entry describes a corrupt file/block
-   * @throws AccessControlException
-   * @throws IOException
+   * {@inheritDoc}
    */
-  public Collection<FSNamesystem.CorruptFileBlockInfo> listCorruptFileBlocks(String path,
-      String startBlockAfter) throws AccessControlException, IOException {
-    return namesystem.listCorruptFileBlocks(path, startBlockAfter);
+  @Override
+  public CorruptFileBlocks
+    listCorruptFileBlocks(String path, String cookie) 
+    throws IOException {
+    Collection<FSNamesystem.CorruptFileBlockInfo> fbs =
+      namesystem.listCorruptFileBlocks(path, cookie);
+    
+    String[] files = new String[fbs.size()];
+    String lastCookie = "";
+    int i = 0;
+    for(FSNamesystem.CorruptFileBlockInfo fb: fbs) {
+      files[i++] = fb.path;
+      lastCookie = fb.block.getBlockName();
+    }
+    return new CorruptFileBlocks(files, lastCookie);
   }
   
   /** {@inheritDoc} */

+ 2 - 2
src/java/org/apache/hadoop/hdfs/server/namenode/NamenodeFsck.java

@@ -216,8 +216,8 @@ public class NamenodeFsck {
  
   private void listCorruptFileBlocks() throws AccessControlException,
       IOException {
-    Collection<FSNamesystem.CorruptFileBlockInfo> corruptFiles = namenode
-        .listCorruptFileBlocks(path, startBlockAfter);
+    Collection<FSNamesystem.CorruptFileBlockInfo> corruptFiles = namenode.
+      getNamesystem().listCorruptFileBlocks(path, startBlockAfter);
     int numCorruptFiles = corruptFiles.size();
     String filler;
     if (numCorruptFiles > 0) {

+ 52 - 27
src/java/org/apache/hadoop/hdfs/server/namenode/SecondaryNameNode.java

@@ -312,6 +312,7 @@ public class SecondaryNameNode implements Runnable {
         LOG.error("Exception in doCheckpoint: ");
         LOG.error(StringUtils.stringifyException(e));
         e.printStackTrace();
+        checkpointImage.imageDigest = null;
       } catch (Throwable e) {
         LOG.error("Throwable Exception in doCheckpoint: ");
         LOG.error(StringUtils.stringifyException(e));
@@ -324,28 +325,39 @@ public class SecondaryNameNode implements Runnable {
   /**
    * Download <code>fsimage</code> and <code>edits</code>
    * files from the name-node.
+   * @return true if a new image has been downloaded and needs to be loaded
    * @throws IOException
    */
-  private void downloadCheckpointFiles(final CheckpointSignature sig
+  private boolean downloadCheckpointFiles(final CheckpointSignature sig
                                       ) throws IOException {
     try {
-        UserGroupInformation.getCurrentUser().doAs(new PrivilegedExceptionAction<Void>() {
+        Boolean b = UserGroupInformation.getCurrentUser().doAs(
+            new PrivilegedExceptionAction<Boolean>() {
   
           @Override
-          public Void run() throws Exception {
+          public Boolean run() throws Exception {
             checkpointImage.cTime = sig.cTime;
             checkpointImage.checkpointTime = sig.checkpointTime;
-            checkpointImage.imageDigest = sig.imageDigest;
-        
+                    
             // get fsimage
-            String fileid = "getimage=1";
-            Collection<File> list = checkpointImage.getFiles(NameNodeFile.IMAGE,
-                NameNodeDirType.IMAGE);
-            File[] srcNames = list.toArray(new File[list.size()]);
-            assert srcNames.length > 0 : "No checkpoint targets.";
-            TransferFsImage.getFileClient(fsName, fileid, srcNames, false);
-            LOG.info("Downloaded file " + srcNames[0].getName() + " size " +
-                     srcNames[0].length() + " bytes.");
+            String fileid;
+            Collection<File> list;
+            File[] srcNames;
+            boolean downloadImage = true;
+            if (sig.imageDigest.equals(checkpointImage.imageDigest)) {
+              downloadImage = false;
+              LOG.info("Image has not changed. Will not download image.");
+            } else {
+              fileid = "getimage=1";
+              list = checkpointImage.getFiles(NameNodeFile.IMAGE,
+                  NameNodeDirType.IMAGE);
+              srcNames = list.toArray(new File[list.size()]);
+              assert srcNames.length > 0 : "No checkpoint targets.";
+              TransferFsImage.getFileClient(fsName, fileid, srcNames, false);
+              checkpointImage.imageDigest = sig.imageDigest;
+              LOG.info("Downloaded file " + srcNames[0].getName() + " size " +
+                  srcNames[0].length() + " bytes.");
+            }
         
             // get edits file
             fileid = "getedit=1";
@@ -357,9 +369,10 @@ public class SecondaryNameNode implements Runnable {
                 srcNames[0].length() + " bytes.");
         
             checkpointImage.checkpointUploadDone();
-            return null;
+            return Boolean.valueOf(downloadImage);
           }
         });
+        return b.booleanValue();
       } catch (InterruptedException e) {
         throw new RuntimeException(e);
       }
@@ -408,8 +421,9 @@ public class SecondaryNameNode implements Runnable {
 
   /**
    * Create a new checkpoint
+   * @return if the image is fetched from primary or not
    */
-  void doCheckpoint() throws IOException {
+  boolean doCheckpoint() throws IOException {
 
     // Do the required initialization of the merge work area.
     startCheckpoint();
@@ -424,8 +438,8 @@ public class SecondaryNameNode implements Runnable {
                             "after creating edits.new");
     }
 
-    downloadCheckpointFiles(sig);   // Fetch fsimage and edits
-    doMerge(sig);                   // Do the merge
+    boolean loadImage = downloadCheckpointFiles(sig);   // Fetch fsimage and edits
+    doMerge(sig, loadImage);                   // Do the merge
   
     //
     // Upload the new image into the NameNode. Then tell the Namenode
@@ -444,6 +458,8 @@ public class SecondaryNameNode implements Runnable {
 
     LOG.warn("Checkpoint done. New Image Size: " 
               + checkpointImage.getFsImageName().length());
+    
+    return loadImage;
   }
 
   private void startCheckpoint() throws IOException {
@@ -457,11 +473,12 @@ public class SecondaryNameNode implements Runnable {
    * Merge downloaded image and edits and write the new image into
    * current storage directory.
    */
-  private void doMerge(CheckpointSignature sig) throws IOException {
+  private void doMerge(CheckpointSignature sig, boolean loadImage)
+  throws IOException {
     FSNamesystem namesystem = 
             new FSNamesystem(checkpointImage, conf);
     assert namesystem.dir.fsImage == checkpointImage;
-    checkpointImage.doMerge(sig);
+    checkpointImage.doMerge(sig, loadImage);
   }
 
   /**
@@ -665,21 +682,29 @@ public class SecondaryNameNode implements Runnable {
     /**
      * Merge image and edits, and verify consistency with the signature.
      */
-    private void doMerge(CheckpointSignature sig) throws IOException {
+    private void doMerge(CheckpointSignature sig, boolean loadImage)
+    throws IOException {
       getEditLog().open();
       StorageDirectory sdName = null;
       StorageDirectory sdEdits = null;
       Iterator<StorageDirectory> it = null;
-      it = dirIterator(NameNodeDirType.IMAGE);
-      if (it.hasNext())
-        sdName = it.next();
+      if (loadImage) {
+        it = dirIterator(NameNodeDirType.IMAGE);
+        if (it.hasNext())
+          sdName = it.next();
+        if (sdName == null) {
+          throw new IOException("Could not locate checkpoint fsimage");
+        }
+      }
       it = dirIterator(NameNodeDirType.EDITS);
       if (it.hasNext())
         sdEdits = it.next();
-      if ((sdName == null) || (sdEdits == null))
-        throw new IOException("Could not locate checkpoint directories");
-      this.layoutVersion = -1; // to avoid assert in loadFSImage()
-      loadFSImage(FSImage.getImageFile(sdName, NameNodeFile.IMAGE));
+      if (sdEdits == null)
+        throw new IOException("Could not locate checkpoint edits");
+      if (loadImage) {
+        this.layoutVersion = -1; // to avoid assert in loadFSImage()
+        loadFSImage(FSImage.getImageFile(sdName, NameNodeFile.IMAGE));
+      }
       loadFSEdits(sdEdits);
       clusterID = sig.getClusterID();
       blockpoolID = sig.getBlockpoolID();

+ 1 - 1
src/test/aop/build/aop.xml

@@ -21,7 +21,7 @@
   <property name="aspectversion" value="1.6.5"/>
   <!-- TODO this has to be changed synchronously with build.xml version prop.-->
   <!-- this workarounds of test-patch setting its own 'version' -->
-  <property name="project.version" value="0.22.0-SNAPSHOT"/>
+  <property name="project.version" value="0.23.0-SNAPSHOT"/>
 
   <!-- Properties common for all fault injections -->
   <property name="build-fi.dir" value="${basedir}/build-fi"/>

+ 79 - 0
src/test/hdfs/org/apache/hadoop/hdfs/protocol/TestCorruptFileBlocks.java

@@ -0,0 +1,79 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hdfs.protocol;
+
+import java.io.ByteArrayInputStream;
+import java.io.DataInputStream;
+import java.io.IOException;
+
+import static org.junit.Assert.assertTrue;
+import org.junit.Test;
+
+import org.apache.hadoop.io.DataOutputBuffer;
+
+public class TestCorruptFileBlocks {
+
+  /**
+   * Serialize the cfb given, deserialize and return the result.
+   */
+  static CorruptFileBlocks serializeAndDeserialize(CorruptFileBlocks cfb) 
+    throws IOException {
+    DataOutputBuffer buf = new DataOutputBuffer();
+    cfb.write(buf);
+
+    byte[] data = buf.getData();
+    DataInputStream input = new DataInputStream(new ByteArrayInputStream(data));
+
+    CorruptFileBlocks result = new CorruptFileBlocks();
+    result.readFields(input);
+
+    return result;
+  }
+
+  /**
+   * Check whether cfb is unchanged after serialization and deserialization.
+   */
+  static boolean checkSerialize(CorruptFileBlocks cfb)
+    throws IOException {
+    return cfb.equals(serializeAndDeserialize(cfb));
+  }
+
+  /**
+   * Test serialization and deserializaton of CorruptFileBlocks.
+   */
+  @Test
+  public void testSerialization() throws IOException {
+    {
+      CorruptFileBlocks cfb = new CorruptFileBlocks();
+      assertTrue("cannot serialize empty CFB", checkSerialize(cfb));
+    }
+
+    {
+      String[] files = new String[0];
+      CorruptFileBlocks cfb = new CorruptFileBlocks(files, "");
+      assertTrue("cannot serialize CFB with empty cookie", checkSerialize(cfb));
+    }
+
+    {
+      String[] files = { "a", "bb", "ccc" };
+      CorruptFileBlocks cfb = new CorruptFileBlocks(files, "test");
+      assertTrue("cannot serialize CFB", checkSerialize(cfb));
+    }
+  }
+}

+ 52 - 0
src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/TestCheckpoint.java

@@ -870,4 +870,56 @@ public class TestCheckpoint extends TestCase {
     secondary2.shutdown();
     cluster.shutdown();
   }
+  
+  /**
+   * Simulate a secondary node failure to transfer image
+   * back to the name-node.
+   * Used to truncate primary fsimage file.
+   */
+  @SuppressWarnings("deprecation")
+  public void testSecondaryImageDownload(Configuration conf)
+    throws IOException {
+    System.out.println("Starting testSecondaryImageDownload");
+    conf.set(DFSConfigKeys.DFS_NAMENODE_SECONDARY_HTTP_ADDRESS_KEY, "0.0.0.0:0");
+    Path dir = new Path("/checkpoint");
+    MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf)
+                                               .numDataNodes(numDatanodes)
+                                               .format(false).build();
+    cluster.waitActive();
+    FileSystem fileSys = cluster.getFileSystem();
+    FSImage image = cluster.getNameNode().getFSImage();
+    try {
+      assertTrue(!fileSys.exists(dir));
+      //
+      // Make the checkpoint
+      //
+      SecondaryNameNode secondary = startSecondaryNameNode(conf);
+      long fsimageLength = FSImage.getImageFile(
+          image.dirIterator(NameNodeDirType.IMAGE).next(),
+          NameNodeFile.IMAGE).length();
+      assertFalse("Image is downloaded", secondary.doCheckpoint());
+
+      // Verify that image file sizes did not change.
+      for (Iterator<StorageDirectory> it = 
+              image.dirIterator(NameNodeDirType.IMAGE); it.hasNext();) {
+        assertTrue("Image size does not change", FSImage.getImageFile(it.next(), 
+                                NameNodeFile.IMAGE).length() == fsimageLength);
+      }
+
+      // change namespace
+      fileSys.mkdirs(dir);
+      assertTrue("Image is not downloaded", secondary.doCheckpoint());
+
+      for (Iterator<StorageDirectory> it = 
+        image.dirIterator(NameNodeDirType.IMAGE); it.hasNext();) {
+        assertTrue("Image size increased", FSImage.getImageFile(it.next(), 
+                          NameNodeFile.IMAGE).length() > fsimageLength);
+     }
+
+      secondary.shutdown();
+    } finally {
+      fileSys.close();
+      cluster.shutdown();
+    }
+  }
 }

+ 3 - 3
src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/TestCorruptFilesJsp.java

@@ -66,8 +66,8 @@ public class TestCorruptFilesJsp  {
 
       // verify there are not corrupt files
       final NameNode namenode = cluster.getNameNode();
-      Collection<FSNamesystem.CorruptFileBlockInfo> badFiles = namenode
-          .listCorruptFileBlocks("/", null);
+      Collection<FSNamesystem.CorruptFileBlockInfo> badFiles = namenode.
+        getNamesystem().listCorruptFileBlocks("/", null);
       assertTrue("There are " + badFiles.size()
           + " corrupt files, but expecting none", badFiles.size() == 0);
 
@@ -94,7 +94,7 @@ public class TestCorruptFilesJsp  {
       }
 
       // verify if all corrupt files were reported to NN
-      badFiles = namenode.listCorruptFileBlocks("/", null);
+      badFiles = namenode.getNamesystem().listCorruptFileBlocks("/", null);
       assertTrue("Expecting 3 corrupt files, but got " + badFiles.size(),
           badFiles.size() == 3);
 

+ 4 - 3
src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/TestFsck.java

@@ -39,6 +39,7 @@ import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.CorruptFileBlocks;
 import org.apache.hadoop.fs.permission.FsPermission;
 import org.apache.hadoop.hdfs.DFSClient;
 import org.apache.hadoop.hdfs.DFSConfigKeys;
@@ -483,14 +484,14 @@ public class TestFsck extends TestCase {
 
       // wait for the namenode to see the corruption
       final NameNode namenode = cluster.getNameNode();
-      Collection<FSNamesystem.CorruptFileBlockInfo> corruptFileBlocks = namenode
+      CorruptFileBlocks corruptFileBlocks = namenode
           .listCorruptFileBlocks("/corruptData", null);
-      int numCorrupt = corruptFileBlocks.size();
+      int numCorrupt = corruptFileBlocks.getFiles().length;
       while (numCorrupt == 0) {
         Thread.sleep(1000);
         corruptFileBlocks = namenode
             .listCorruptFileBlocks("/corruptData", null);
-        numCorrupt = corruptFileBlocks.size();
+        numCorrupt = corruptFileBlocks.getFiles().length;
       }
       outStr = runFsck(conf, -1, true, "/corruptData", "-list-corruptfileblocks");
       System.out.println("2. bad fsck out: " + outStr);

+ 94 - 16
src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/TestListCorruptFileBlocks.java

@@ -30,10 +30,12 @@ import junit.framework.TestCase;
 import org.apache.commons.logging.Log;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.CorruptFileBlocks;
 import org.apache.hadoop.hdfs.BlockMissingException;
 import org.apache.hadoop.hdfs.DFSTestUtil;
 import org.apache.hadoop.hdfs.HdfsConfiguration;
 import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.hdfs.DistributedFileSystem;
 
 /**
  * This class tests the listCorruptFileBlocks API.
@@ -64,8 +66,8 @@ public class TestListCorruptFileBlocks extends TestCase {
 
       // fetch bad file list from namenode. There should be none.
       final NameNode namenode = cluster.getNameNode();
-      Collection<FSNamesystem.CorruptFileBlockInfo> badFiles = namenode
-          .listCorruptFileBlocks("/", null);
+      Collection<FSNamesystem.CorruptFileBlockInfo> badFiles = namenode.
+        getNamesystem().listCorruptFileBlocks("/", null);
       assertTrue("Namenode has " + badFiles.size()
           + " corrupt files. Expecting None.", badFiles.size() == 0);
 
@@ -107,7 +109,7 @@ public class TestListCorruptFileBlocks extends TestCase {
       }
 
       // fetch bad file list from namenode. There should be one file.
-      badFiles = namenode.listCorruptFileBlocks("/", null);
+      badFiles = namenode.getNamesystem().listCorruptFileBlocks("/", null);
       LOG.info("Namenode has bad files. " + badFiles.size());
       assertTrue("Namenode has " + badFiles.size() + " bad files. Expecting 1.",
           badFiles.size() == 1);
@@ -134,8 +136,8 @@ public class TestListCorruptFileBlocks extends TestCase {
       util.createFiles(fs, "/corruptData");
 
       final NameNode namenode = cluster.getNameNode();
-      Collection<FSNamesystem.CorruptFileBlockInfo> corruptFileBlocks = namenode
-          .listCorruptFileBlocks("/corruptData", null);
+      Collection<FSNamesystem.CorruptFileBlockInfo> corruptFileBlocks = 
+        namenode.getNamesystem().listCorruptFileBlocks("/corruptData", null);
       int numCorrupt = corruptFileBlocks.size();
       assertTrue(numCorrupt == 0);
       // delete the blocks
@@ -161,11 +163,12 @@ public class TestListCorruptFileBlocks extends TestCase {
       }
 
       int count = 0;
-      corruptFileBlocks = namenode.listCorruptFileBlocks("/corruptData", null);
+      corruptFileBlocks = namenode.getNamesystem().
+        listCorruptFileBlocks("/corruptData", null);
       numCorrupt = corruptFileBlocks.size();
       while (numCorrupt < 3) {
         Thread.sleep(1000);
-        corruptFileBlocks = namenode
+        corruptFileBlocks = namenode.getNamesystem()
             .listCorruptFileBlocks("/corruptData", null);
         numCorrupt = corruptFileBlocks.size();
         count++;
@@ -180,7 +183,8 @@ public class TestListCorruptFileBlocks extends TestCase {
       FSNamesystem.CorruptFileBlockInfo[] cfb = corruptFileBlocks
           .toArray(new FSNamesystem.CorruptFileBlockInfo[0]);
       // now get the 2nd and 3rd file that is corrupt
-      Collection<FSNamesystem.CorruptFileBlockInfo> nextCorruptFileBlocks = namenode
+      Collection<FSNamesystem.CorruptFileBlockInfo> nextCorruptFileBlocks =
+        namenode.getNamesystem()
           .listCorruptFileBlocks("/corruptData", cfb[0].block.getBlockName());
       FSNamesystem.CorruptFileBlockInfo[] ncfb = nextCorruptFileBlocks
           .toArray(new FSNamesystem.CorruptFileBlockInfo[0]);
@@ -189,14 +193,16 @@ public class TestListCorruptFileBlocks extends TestCase {
       assertTrue(ncfb[0].block.getBlockName()
           .equalsIgnoreCase(cfb[1].block.getBlockName()));
 
-      corruptFileBlocks = namenode.listCorruptFileBlocks("/corruptData",
+      corruptFileBlocks = 
+        namenode.getNamesystem().listCorruptFileBlocks("/corruptData",
           ncfb[1].block.getBlockName());
       numCorrupt = corruptFileBlocks.size();
       assertTrue(numCorrupt == 0);
       // Do a listing on a dir which doesn't have any corrupt blocks and
       // validate
       util.createFiles(fs, "/goodData");
-      corruptFileBlocks = namenode.listCorruptFileBlocks("/goodData", null);
+      corruptFileBlocks = 
+        namenode.getNamesystem().listCorruptFileBlocks("/goodData", null);
       numCorrupt = corruptFileBlocks.size();
       assertTrue(numCorrupt == 0);
       util.cleanup(fs, "/corruptData");
@@ -207,7 +213,76 @@ public class TestListCorruptFileBlocks extends TestCase {
       }
     }
   }
-  
+
+  /**
+   * test listCorruptFileBlocks in DistributedFileSystem
+   */ 
+  public void testlistCorruptFileBlocksDFS() throws Exception {
+    Configuration conf = new Configuration();
+    conf.setLong("dfs.blockreport.intervalMsec", 1000);
+    conf.setInt("dfs.datanode.directoryscan.interval", 1); // datanode scans
+                                                           // directories
+    FileSystem fs = null;
+
+    MiniDFSCluster cluster = null;
+    try {
+      cluster = new MiniDFSCluster.Builder(conf).build();
+      cluster.waitActive();
+      fs = cluster.getFileSystem();
+      DistributedFileSystem dfs = (DistributedFileSystem) fs;
+      DFSTestUtil util = new DFSTestUtil("testGetCorruptFiles", 3, 1, 1024);
+      util.createFiles(fs, "/corruptData");
+
+      final NameNode namenode = cluster.getNameNode();
+      CorruptFileBlocks corruptFileBlocks = 
+        dfs.listCorruptFileBlocks("/corruptData", null);
+      int numCorrupt = corruptFileBlocks.getFiles().length;
+      assertTrue(numCorrupt == 0);
+      // delete the blocks
+      File baseDir = new File(System.getProperty("test.build.data",
+          "build/test/data"), "dfs/data");
+      for (int i = 0; i < 8; i++) {
+        File data_dir = new File(baseDir, "data" + (i + 1)
+            + MiniDFSCluster.FINALIZED_DIR_NAME);
+        File[] blocks = data_dir.listFiles();
+        if (blocks == null)
+          continue;
+        // assertTrue("Blocks do not exist in data-dir", (blocks != null) &&
+        // (blocks.length > 0));
+        for (int idx = 0; idx < blocks.length; idx++) {
+          if (!blocks[idx].getName().startsWith("blk_")) {
+            continue;
+          }
+          LOG.info("Deliberately removing file " + blocks[idx].getName());
+          assertTrue("Cannot remove file.", blocks[idx].delete());
+          // break;
+        }
+      }
+
+      int count = 0;
+      corruptFileBlocks = dfs.listCorruptFileBlocks("/corruptData", null);
+      numCorrupt = corruptFileBlocks.getFiles().length;
+      while (numCorrupt < 3) {
+        Thread.sleep(1000);
+        corruptFileBlocks = dfs.listCorruptFileBlocks("/corruptData", null);
+        numCorrupt = corruptFileBlocks.getFiles().length;
+        count++;
+        if (count > 30)
+          break;
+      }
+      // Validate we get all the corrupt files
+      LOG.info("Namenode has bad files. " + numCorrupt);
+      assertTrue(numCorrupt == 3);
+
+      util.cleanup(fs, "/corruptData");
+      util.cleanup(fs, "/goodData");
+    } finally {
+      if (cluster != null) {
+        cluster.shutdown();
+      }
+    }
+  }
+    
   /** check if NN.listCorruptFiles() returns the right limit */
   public void testMaxCorruptFiles() throws Exception {
     MiniDFSCluster cluster = null;
@@ -228,8 +303,8 @@ public class TestListCorruptFileBlocks extends TestCase {
 
       // verify that there are no bad blocks.
       final NameNode namenode = cluster.getNameNode();
-      Collection<FSNamesystem.CorruptFileBlockInfo> badFiles = namenode
-          .listCorruptFileBlocks("/srcdat2", null);
+      Collection<FSNamesystem.CorruptFileBlockInfo> badFiles = namenode.
+        getNamesystem().listCorruptFileBlocks("/srcdat2", null);
       assertTrue("Namenode has " + badFiles.size() + " corrupt files. Expecting none.",
           badFiles.size() == 0);
 
@@ -253,14 +328,17 @@ public class TestListCorruptFileBlocks extends TestCase {
         }
       }
 
-      badFiles = namenode.listCorruptFileBlocks("/srcdat2", null);
+      badFiles = 
+        namenode.getNamesystem().listCorruptFileBlocks("/srcdat2", null);
         
        while (badFiles.size() < maxCorruptFileBlocks) {
         LOG.info("# of corrupt files is: " + badFiles.size());
         Thread.sleep(10000);
-        badFiles = namenode.listCorruptFileBlocks("/srcdat2", null);
+        badFiles = namenode.getNamesystem().
+          listCorruptFileBlocks("/srcdat2", null);
       }
-      badFiles = namenode.listCorruptFileBlocks("/srcdat2", null); 
+      badFiles = namenode.getNamesystem().
+        listCorruptFileBlocks("/srcdat2", null); 
       LOG.info("Namenode has bad files. " + badFiles.size());
       assertTrue("Namenode has " + badFiles.size() + " bad files. Expecting " + 
           maxCorruptFileBlocks + ".",

+ 1 - 1
src/webapps/hdfs/corrupt_files.jsp

@@ -34,7 +34,7 @@
   String namenodeLabel = nn.getNameNodeAddress().getHostName() + ":"
       + nn.getNameNodeAddress().getPort();
   Collection<FSNamesystem.CorruptFileBlockInfo> corruptFileBlocks = 
-	nn.listCorruptFileBlocks("/", null);
+	fsn.listCorruptFileBlocks("/", null);
   int corruptFileCount = corruptFileBlocks.size();
 %>