Browse Source

HADOOP-498. Re-implement DFS integrity checker to run server-side for much improved performance. Contributed by Milind.

git-svn-id: https://svn.apache.org/repos/asf/lucene/hadoop/trunk@464710 13f79535-47bb-0310-9956-ffa450edef68
Doug Cutting 19 years ago
parent
commit
a4f6154c0a

+ 3 - 0
CHANGES.txt

@@ -11,6 +11,9 @@ Trunk (unreleased changes)
     attempts from 'info' to 'debug', so they are not normally shown.
     attempts from 'info' to 'debug', so they are not normally shown.
     (Konstantin Shvachko via cutting)
     (Konstantin Shvachko via cutting)
 
 
+ 3. HADOOP-498.  Re-implement DFS integrity checker to run server-side,
+    for much improved performance.  (Milind Bhandarkar via cutting)
+
 
 
 Release 0.7.1 - 2006-10-11
 Release 0.7.1 - 2006-10-11
 
 

+ 43 - 530
src/java/org/apache/hadoop/dfs/DFSck.java

@@ -15,22 +15,16 @@
  */
  */
 package org.apache.hadoop.dfs;
 package org.apache.hadoop.dfs;
 
 
-import java.io.BufferedInputStream;
-import java.io.BufferedOutputStream;
-import java.io.DataInputStream;
-import java.io.DataOutputStream;
 import java.io.IOException;
 import java.io.IOException;
-import java.net.InetSocketAddress;
-import java.net.Socket;
-import java.util.ArrayList;
-import java.util.Random;
-import java.util.TreeSet;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.net.URL;
+import java.net.URLConnection;
+import java.net.URLEncoder;
 
 
 import org.apache.commons.logging.*;
 import org.apache.commons.logging.*;
 
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FSOutputStream;
-import org.apache.hadoop.io.UTF8;
 import org.apache.hadoop.util.ToolBase;
 import org.apache.hadoop.util.ToolBase;
 
 
 /**
 /**
@@ -60,361 +54,32 @@ import org.apache.hadoop.util.ToolBase;
 public class DFSck extends ToolBase {
 public class DFSck extends ToolBase {
   private static final Log LOG = LogFactory.getLog(DFSck.class.getName());
   private static final Log LOG = LogFactory.getLog(DFSck.class.getName());
 
 
-  /** Don't attempt any fixing . */
-  public static final int FIXING_NONE = 0;
-  /** Move corrupted files to /lost+found . */
-  public static final int FIXING_MOVE = 1;
-  /** Delete corrupted files. */
-  public static final int FIXING_DELETE = 2;
-  
-  private DFSClient dfs;
-  private UTF8 lostFound = null;
-  private boolean lfInited = false;
-  private boolean lfInitedOk = false;
-  private boolean showFiles = false;
-  private boolean showBlocks = false;
-  private boolean showLocations = false;
-  private int fixing;
- 
-  DFSck() {
-  }
+  DFSck() {}
   
   
   /**
   /**
    * Filesystem checker.
    * Filesystem checker.
    * @param conf current Configuration
    * @param conf current Configuration
-   * @param fixing one of pre-defined values
-   * @param showFiles show each file being checked
-   * @param showBlocks for each file checked show its block information
-   * @param showLocations for each block in each file show block locations
    * @throws Exception
    * @throws Exception
    */
    */
-  public DFSck(Configuration conf, int fixing, boolean showFiles, boolean showBlocks, boolean showLocations) throws Exception {
+  public DFSck(Configuration conf) throws Exception {
     setConf(conf);
     setConf(conf);
-    init(fixing, showFiles, showBlocks, showLocations);
-  }
-  
-  public void init(int fixing, boolean showFiles, 
-          boolean showBlocks, boolean showLocations) throws IOException {
-      String fsName = conf.get("fs.default.name", "local");
-      if (fsName.equals("local")) {
-        throw new IOException("This tool only checks DFS, but your config uses 'local' FS.");
-      }
-      this.dfs = new DFSClient(DataNode.createSocketAddr(fsName), conf);
-      this.fixing = fixing;
-      this.showFiles = showFiles;
-      this.showBlocks = showBlocks;
-      this.showLocations = showLocations;
-  }
-  
-  /**
-   * Check files on DFS, starting from the indicated path.
-   * @param path starting point
-   * @return result of checking
-   * @throws Exception
-   */
-  public Result fsck(String path) throws Exception {
-    DFSFileInfo[] files = dfs.listPaths(new UTF8(path));
-    Result res = new Result();
-    res.setReplication(dfs.getDefaultReplication());
-    for (int i = 0; i < files.length; i++) {
-      check(files[i], res);
-    }
-    return res;
-  }
-  
-  private void check(DFSFileInfo file, Result res) throws Exception {
-    if (file.isDir()) {
-      if (showFiles)
-        System.out.println(file.getPath() + " <dir>");
-      res.totalDirs++;
-      DFSFileInfo[] files = dfs.listPaths(new UTF8(file.getPath()));
-      for (int i = 0; i < files.length; i++) {
-        check(files[i], res);
-      }
-      return;
-    }
-    res.totalFiles++;
-    res.totalSize += file.getLen();
-    LocatedBlock[] blocks = dfs.namenode.open(file.getPath());
-    res.totalBlocks += blocks.length;
-    if (showFiles) {
-      System.out.print(file.getPath() + " " + file.getLen() + ", " + blocks.length + " block(s): ");
-    } else {
-      System.out.print('.');
-      System.out.flush();
-      if (res.totalFiles % 100 == 0) System.out.println();
-    }
-    int missing = 0;
-    long missize = 0;
-    StringBuffer report = new StringBuffer();
-    for (int i = 0; i < blocks.length; i++) {
-      Block block = blocks[i].getBlock();
-      long id = block.getBlockId();
-      DatanodeInfo[] locs = blocks[i].getLocations();
-      short targetFileReplication = file.getReplication();
-      if (locs.length > targetFileReplication) res.overReplicatedBlocks += (locs.length - targetFileReplication);
-      if (locs.length < targetFileReplication && locs.length > 0) res.underReplicatedBlocks += (targetFileReplication - locs.length);
-      report.append(i + ". " + id + " len=" + block.getNumBytes());
-      if (locs == null || locs.length == 0) {
-        report.append(" MISSING!");
-        res.addMissing(block.getBlockName(), block.getNumBytes());
-        missing++;
-        missize += block.getNumBytes();
-      } else {
-        report.append(" repl=" + locs.length);
-        if (showLocations) {
-          StringBuffer sb = new StringBuffer("[");
-          for (int j = 0; j < locs.length; j++) {
-            if (j > 0) sb.append(", ");
-            sb.append(locs[j]);
-          }
-          sb.append(']');
-          report.append(" " + sb.toString());
-        }
-      }
-      report.append('\n');
-    }
-    if (missing > 0) {
-      if (!showFiles)
-        System.out.println("\nMISSING " + missing + " blocks of total size " + missize + " B");
-      res.corruptFiles++;
-      switch (fixing) {
-        case FIXING_NONE: // do nothing
-          System.err.println("\n - ignoring corrupted " + file.getPath());
-          break;
-        case FIXING_MOVE:
-          System.err.println("\n - moving to /lost+found: " + file.getPath());
-          lostFoundMove(file, blocks);
-          break;
-        case FIXING_DELETE:
-          System.err.println("\n - deleting corrupted " + file.getPath());
-          dfs.delete(new UTF8(file.getPath()));
-      }
-    }
-    if (showFiles) {
-      if (missing > 0) {
-        System.out.println(" MISSING " + missing + " blocks of total size " + missize + " B");
-      } else System.out.println(" OK");
-      if (showBlocks) System.out.println(report.toString());
-    }
-  }
-  
-  private void lostFoundMove(DFSFileInfo file, LocatedBlock[] blocks) {
-    if (!lfInited) {
-      lostFoundInit();
-    }
-    if (!lfInitedOk) {
-      return;
-    }
-    UTF8 target = new UTF8(lostFound.toString() + file.getPath());
-    String errmsg = "Failed to move " + file.getPath() + " to /lost+found";
-    try {
-      if (!dfs.mkdirs(target)) {
-        System.err.println(errmsg);
-        return;
-      }
-      // create chains
-      int chain = 0;
-      FSOutputStream fos = null;
-      for (int i = 0; i < blocks.length; i++) {
-        LocatedBlock lblock = blocks[i];
-        DatanodeInfo[] locs = lblock.getLocations();
-        if (locs == null || locs.length == 0) {
-          if (fos != null) {
-            fos.flush();
-            fos.close();
-            fos = null;
-          }
-          continue;
-        }
-        if (fos == null) {
-          fos = dfs.create(new UTF8(target.toString() + "/" + chain), true);
-          if (fos != null) chain++;
-        }
-        if (fos == null) {
-          System.err.println(errmsg + ": could not store chain " + chain);
-          // perhaps we should bail out here...
-          // return;
-          continue;
-        }
-        
-        // copy the block. It's a pity it's not abstracted from DFSInputStream ...
-        try {
-          copyBlock(lblock, fos);
-        } catch (Exception e) {
-          e.printStackTrace();
-          // something went wrong copying this block...
-          System.err.println(" - could not copy block " + lblock.getBlock().getBlockName() + " to " + target);
-          fos.flush();
-          fos.close();
-          fos = null;
-        }
-      }
-      if (fos != null) fos.close();
-      System.err.println("\n - moved corrupted file " + file.getPath() + " to /lost+found");
-      dfs.delete(new UTF8(file.getPath()));
-    } catch (Exception e) {
-      e.printStackTrace();
-      System.err.println(errmsg + ": " + e.getMessage());
-    }
-  }
-  
-  /*
-   * XXX (ab) Bulk of this method is copied verbatim from {@link DFSClient}, which is
-   * bad. Both places should be refactored to provide a method to copy blocks
-   * around.
-   */
-  private void copyBlock(LocatedBlock lblock, FSOutputStream fos) throws Exception {
-    int failures = 0;
-    InetSocketAddress targetAddr = null;
-    TreeSet deadNodes = new TreeSet();
-    Socket s = null;
-    DataInputStream in = null;
-    DataOutputStream out = null;
-    while (s == null) {
-        DatanodeInfo chosenNode;
-
-        try {
-            chosenNode = bestNode(lblock.getLocations(), deadNodes);
-            targetAddr = DataNode.createSocketAddr(chosenNode.getName());
-        } catch (IOException ie) {
-            if (failures >= DFSClient.MAX_BLOCK_ACQUIRE_FAILURES) {
-                throw new IOException("Could not obtain block " + lblock);
-            }
-            LOG.info("Could not obtain block from any node:  " + ie);
-            try {
-                Thread.sleep(10000);
-            } catch (InterruptedException iex) {
-            }
-            deadNodes.clear();
-            failures++;
-            continue;
-        }
-        try {
-            s = new Socket();
-            s.connect(targetAddr, FSConstants.READ_TIMEOUT);
-            s.setSoTimeout(FSConstants.READ_TIMEOUT);
-
-            //
-            // Xmit header info to datanode
-            //
-            out = new DataOutputStream(new BufferedOutputStream(s.getOutputStream()));
-            out.write(FSConstants.OP_READSKIP_BLOCK);
-            lblock.getBlock().write(out);
-            out.writeLong(0L);
-            out.flush();
-
-            //
-            // Get bytes in block, set streams
-            //
-            in = new DataInputStream(new BufferedInputStream(s.getInputStream()));
-            long curBlockSize = in.readLong();
-            long amtSkipped = in.readLong();
-            if (curBlockSize != lblock.getBlock().len) {
-                throw new IOException("Recorded block size is " + lblock.getBlock().len + ", but datanode reports size of " + curBlockSize);
-            }
-            if (amtSkipped != 0L) {
-                throw new IOException("Asked for offset of " + 0L + ", but only received offset of " + amtSkipped);
-            }
-        } catch (IOException ex) {
-            // Put chosen node into dead list, continue
-            LOG.info("Failed to connect to " + targetAddr + ":" + ex);
-            deadNodes.add(chosenNode);
-            if (s != null) {
-                try {
-                    s.close();
-                } catch (IOException iex) {
-                }                        
-            }
-            s = null;
-        }
-    }
-    if (in == null) {
-      throw new Exception("Could not open data stream for " + lblock.getBlock().getBlockName());
-    }
-    byte[] buf = new byte[1024];
-    int cnt = 0;
-    boolean success = true;
-    try {
-      while ((cnt = in.read(buf)) > 0) {
-        fos.write(buf, 0, cnt);
-      }
-    } catch (Exception e) {
-      e.printStackTrace();
-      success = false;
-    } finally {
-      try {in.close(); } catch (Exception e1) {}
-      try {out.close(); } catch (Exception e1) {}
-      try {s.close(); } catch (Exception e1) {}
-    }
-    if (!success)
-      throw new Exception("Could not copy block data for " + lblock.getBlock().getBlockName());
   }
   }
   
   
-  /*
-   * XXX (ab) See comment above for copyBlock().
-   * 
-   * Pick the best node from which to stream the data.
-   * That's the local one, if available.
-   */
-  Random r = new Random();
-  private DatanodeInfo bestNode(DatanodeInfo nodes[], TreeSet deadNodes) throws IOException {
-      if ((nodes == null) || 
-          (nodes.length - deadNodes.size() < 1)) {
-          throw new IOException("No live nodes contain current block");
-      }
-      DatanodeInfo chosenNode = null;
-      for (int i = 0; i < nodes.length; i++) {
-          if (deadNodes.contains(nodes[i])) {
-              continue;
-          }
-          String nodename = nodes[i].getName();
-          int colon = nodename.indexOf(':');
-          if (colon >= 0) {
-              nodename = nodename.substring(0, colon);
-          }
-          if (dfs.localName.equals(nodename)) {
-              chosenNode = nodes[i];
-              break;
-          }
-      }
-      if (chosenNode == null) {
-          do {
-              chosenNode = nodes[Math.abs(r.nextInt()) % nodes.length];
-          } while (deadNodes.contains(chosenNode));
-      }
-      return chosenNode;
-  }
-
-  private void lostFoundInit() {
-    lfInited = true;
-    try {
-      UTF8 lfName = new UTF8("/lost+found");
-      // check that /lost+found exists
-      if (!dfs.exists(lfName)) {
-        lfInitedOk = dfs.mkdirs(lfName);
-        lostFound = lfName;
-      } else if (!dfs.isDirectory(lfName)) {
-        System.err.println("Cannot use /lost+found : a regular file with this name exists.");
-        lfInitedOk = false;
-      } else { // exists and isDirectory
-        lostFound = lfName;
-        lfInitedOk = true;
-      }
-    } catch (Exception e) {
-      e.printStackTrace();
-      lfInitedOk = false;
-    }
-    if (lostFound == null) {
-      System.err.println("Cannot initialize /lost+found .");
-      lfInitedOk = false;
-    }
+  private String getInfoServer() throws IOException {
+    String fsName = conf.get("fs.default.name", "local");
+    if (fsName.equals("local")) {
+      throw new IOException("This tool only checks DFS, but your config uses 'local' FS.");
+    }
+    String[] splits = fsName.split(":", 2);
+    int infoPort = conf.getInt("dfs.info.port", 50070);
+    return splits[0]+":"+infoPort;
   }
   }
   
   
   /**
   /**
    * @param args
    * @param args
    */
    */
   public int run(String[] args) throws Exception {
   public int run(String[] args) throws Exception {
+    String fsName = getInfoServer();
     if (args.length == 0) {
     if (args.length == 0) {
       System.err.println("Usage: DFSck <path> [-move | -delete] [-files] [-blocks [-locations]]");
       System.err.println("Usage: DFSck <path> [-move | -delete] [-files] [-blocks [-locations]]");
       System.err.println("\t<path>\tstart checking from this path");
       System.err.println("\t<path>\tstart checking from this path");
@@ -425,26 +90,33 @@ public class DFSck extends ToolBase {
       System.err.println("\t-locations\tprint out locations for every block");
       System.err.println("\t-locations\tprint out locations for every block");
       return -1;
       return -1;
     }
     }
-    String path = args[0];
-    boolean showFiles = false;
-    boolean showBlocks = false;
-    boolean showLocations = false;
-    int fixing = FIXING_NONE;
-    for (int i = 1; i < args.length; i++) {
-      if (args[i].equals("-files")) showFiles = true;
-      if (args[i].equals("-blocks")) showBlocks = true;
-      if (args[i].equals("-locations")) showLocations = true;
-      if (args[i].equals("-move")) fixing = FIXING_MOVE;
-      if (args[i].equals("-delete")) fixing = FIXING_DELETE;
-    }
-    init(fixing, showFiles, showBlocks, showLocations);
-    Result res = fsck(path);
-    System.out.println();
-    System.out.println(res);
-    if (res.isHealthy()) {
-      System.out.println("\n\nThe filesystem under path '" + args[0] + "' is HEALTHY");
-    } else {
-      System.out.println("\n\nThe filesystem under path '" + args[0] + "' is CORRUPT");
+    StringBuffer url = new StringBuffer("http://"+fsName+"/fsck?path=");
+    String dir = "/";
+    // find top-level dir first
+    for (int idx = 0; idx < args.length; idx++) {
+      if (!args[idx].startsWith("-")) { dir = args[idx]; break; }
+    }
+    url.append(URLEncoder.encode(dir, "UTF-8"));
+    for (int idx = 1; idx < args.length; idx++) {
+      if (args[idx].equals("-move")) { url.append("&move"); }
+      if (args[idx].equals("-delete")) { url.append("&delete"); }
+      if (args[idx].equals("-files")) { url.append("&files"); }
+      if (args[idx].equals("-blocks")) { url.append("&blocks"); }
+      if (args[idx].equals("-locations")) { url.append("&locations"); }
+    }
+    URL path = new URL(url.toString());
+    URLConnection connection = path.openConnection();
+    InputStream stream = connection.getInputStream();
+    InputStreamReader input =
+        new InputStreamReader(stream, "UTF-8");
+    try {
+      int c = input.read();
+      while (c != -1) {
+        System.out.print((char) c);
+        c = input.read();
+      }
+    } finally {
+      input.close();
     }
     }
     return 0;
     return 0;
   }
   }
@@ -453,163 +125,4 @@ public class DFSck extends ToolBase {
       int res = new DFSck().doMain(new Configuration(), args);
       int res = new DFSck().doMain(new Configuration(), args);
       System.exit(res);
       System.exit(res);
   }
   }
-
-  /**
-   * Result of checking, plus overall DFS statistics.
-   * @author Andrzej Bialecki
-   */
-  public static class Result {
-    private ArrayList missingIds = new ArrayList();
-    private long missingSize = 0L;
-    private long corruptFiles = 0L;
-    private long overReplicatedBlocks = 0L;
-    private long underReplicatedBlocks = 0L;
-    private int replication = 0;
-    private long totalBlocks = 0L;
-    private long totalFiles = 0L;
-    private long totalDirs = 0L;
-    private long totalSize = 0L;
-    
-    /**
-     * DFS is considered healthy if there are no missing blocks.
-     * @return
-     */
-    public boolean isHealthy() {
-      return missingIds.size() == 0;
-    }
-    
-    /** Add a missing block name, plus its size. */
-    public void addMissing(String id, long size) {
-      missingIds.add(id);
-      missingSize += size;
-    }
-    
-    /** Return a list of missing block names (as list of Strings). */
-    public ArrayList getMissingIds() {
-      return missingIds;
-    }
-
-    /** Return total size of missing data, in bytes. */
-    public long getMissingSize() {
-      return missingSize;
-    }
-
-    public void setMissingSize(long missingSize) {
-      this.missingSize = missingSize;
-    }
-
-    /** Return the number of over-replicsted blocks. */
-    public long getOverReplicatedBlocks() {
-      return overReplicatedBlocks;
-    }
-
-    public void setOverReplicatedBlocks(long overReplicatedBlocks) {
-      this.overReplicatedBlocks = overReplicatedBlocks;
-    }
-
-    /** Return the actual replication factor. */
-    public float getReplicationFactor() {
-      if (totalBlocks != 0)
-        return (float)(totalBlocks * replication + overReplicatedBlocks - underReplicatedBlocks) / (float)totalBlocks;
-      else return 0.0f;
-    }
-
-    /** Return the number of under-replicated blocks. Note: missing blocks are not counted here.*/
-    public long getUnderReplicatedBlocks() {
-      return underReplicatedBlocks;
-    }
-
-    public void setUnderReplicatedBlocks(long underReplicatedBlocks) {
-      this.underReplicatedBlocks = underReplicatedBlocks;
-    }
-
-    /** Return total number of directories encountered during this scan. */
-    public long getTotalDirs() {
-      return totalDirs;
-    }
-
-    public void setTotalDirs(long totalDirs) {
-      this.totalDirs = totalDirs;
-    }
-
-    /** Return total number of files encountered during this scan. */
-    public long getTotalFiles() {
-      return totalFiles;
-    }
-
-    public void setTotalFiles(long totalFiles) {
-      this.totalFiles = totalFiles;
-    }
-
-    /** Return total size of scanned data, in bytes. */
-    public long getTotalSize() {
-      return totalSize;
-    }
-
-    public void setTotalSize(long totalSize) {
-      this.totalSize = totalSize;
-    }
-
-    /** Return the intended replication factor, against which the over/under-
-     * replicated blocks are counted. Note: this values comes from the current
-     * Configuration supplied for the tool, so it may be different from the
-     * value in DFS Configuration.
-     */
-    public int getReplication() {
-      return replication;
-    }
-
-    public void setReplication(int replication) {
-      this.replication = replication;
-    }
-
-    /** Return the total number of blocks in the scanned area. */
-    public long getTotalBlocks() {
-      return totalBlocks;
-    }
-
-    public void setTotalBlocks(long totalBlocks) {
-      this.totalBlocks = totalBlocks;
-    }
-    
-    public String toString() {
-      StringBuffer res = new StringBuffer();
-      res.append("Status: " + (isHealthy() ? "HEALTHY" : "CORRUPT"));
-      res.append("\n Total size:\t" + totalSize + " B");
-      res.append("\n Total blocks:\t" + totalBlocks);
-      if (totalBlocks > 0) res.append(" (avg. block size "
-              + (totalSize / totalBlocks) + " B)");
-      res.append("\n Total dirs:\t" + totalDirs);
-      res.append("\n Total files:\t" + totalFiles);
-      if (missingSize > 0) {
-        res.append("\n  ********************************");
-        res.append("\n  CORRUPT FILES:\t" + corruptFiles);
-        res.append("\n  MISSING BLOCKS:\t" + missingIds.size());
-        res.append("\n  MISSING SIZE:\t\t" + missingSize + " B");
-        res.append("\n  ********************************");
-      }
-      res.append("\n Over-replicated blocks:\t" + overReplicatedBlocks);
-      if (totalBlocks > 0) res.append(" ("
-              + ((float)(overReplicatedBlocks * 100) / (float)totalBlocks)
-              + " %)");
-      res.append("\n Under-replicated blocks:\t" + underReplicatedBlocks);
-      if (totalBlocks > 0) res.append(" ("
-              + ((float)(underReplicatedBlocks * 100) / (float)totalBlocks)
-              + " %)");
-      res.append("\n Target replication factor:\t" + replication);
-      res.append("\n Real replication factor:\t" + getReplicationFactor());
-      return res.toString();
-    }
-
-    /** Return the number of currupted files. */
-    public long getCorruptFiles() {
-      return corruptFiles;
-    }
-
-    public void setCorruptFiles(long corruptFiles) {
-      this.corruptFiles = corruptFiles;
-    }
-  }
-
-
 }
 }

+ 37 - 1
src/java/org/apache/hadoop/dfs/FSNamesystem.java

@@ -26,6 +26,13 @@ import org.apache.hadoop.fs.Path;
 import java.io.*;
 import java.io.*;
 import java.net.InetSocketAddress;
 import java.net.InetSocketAddress;
 import java.util.*;
 import java.util.*;
+import javax.servlet.ServletContext;
+import javax.servlet.ServletException;
+import javax.servlet.http.HttpServlet;
+import javax.servlet.http.HttpServletRequest;
+import javax.servlet.http.HttpServletResponse;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.TaskTracker;
 
 
 /***************************************************
 /***************************************************
  * FSNamesystem does the actual bookkeeping work for the
  * FSNamesystem does the actual bookkeeping work for the
@@ -190,7 +197,7 @@ class FSNamesystem implements FSConstants {
      * dir is where the filesystem directory state 
      * dir is where the filesystem directory state 
      * is stored
      * is stored
      */
      */
-    public FSNamesystem(File dir, Configuration conf) throws IOException {
+    public FSNamesystem(File dir, NameNode nn, Configuration conf) throws IOException {
         fsNamesystemObject = this;
         fsNamesystemObject = this;
         InetSocketAddress addr = DataNode.createSocketAddr(conf.get("fs.default.name", "local"));
         InetSocketAddress addr = DataNode.createSocketAddr(conf.get("fs.default.name", "local"));
         this.maxReplication = conf.getInt("dfs.replication.max", 512);
         this.maxReplication = conf.getInt("dfs.replication.max", 512);
@@ -229,6 +236,10 @@ class FSNamesystem implements FSConstants {
         this.infoPort = conf.getInt("dfs.info.port", 50070);
         this.infoPort = conf.getInt("dfs.info.port", 50070);
         this.infoBindAddress = conf.get("dfs.info.bindAddress", "0.0.0.0");
         this.infoBindAddress = conf.get("dfs.info.bindAddress", "0.0.0.0");
         this.infoServer = new StatusHttpServer("dfs",infoBindAddress, infoPort, false);
         this.infoServer = new StatusHttpServer("dfs",infoBindAddress, infoPort, false);
+        this.infoServer.setAttribute("name.system", this);
+        this.infoServer.setAttribute("name.node", nn);
+        this.infoServer.setAttribute("name.conf", conf);
+        this.infoServer.addServlet("fsck", "/fsck", FsckServlet.class);
         this.infoServer.start();
         this.infoServer.start();
     }
     }
     /** Return the FSNamesystem object
     /** Return the FSNamesystem object
@@ -2464,4 +2475,29 @@ class FSNamesystem implements FSConstants {
         return "";
         return "";
       return safeMode.getTurnOffTip();
       return safeMode.getTurnOffTip();
     }
     }
+    
+    /**
+     * This class is used in Namesystem's jetty to do fsck on namenode
+     * @author Milind Bhandarkar
+     */
+    public static class FsckServlet extends HttpServlet {
+      public void doGet(HttpServletRequest request,
+          HttpServletResponse response
+          ) throws ServletException, IOException {
+        Map<String,String[]> pmap = request.getParameterMap();
+        try {
+          ServletContext context = getServletContext();
+          NameNode nn = (NameNode) context.getAttribute("name.node");
+          Configuration conf = (Configuration) context.getAttribute("name.conf");
+          NamenodeFsck fscker = new NamenodeFsck(conf, nn, pmap, response);
+          fscker.fsck();
+        } catch (IOException ie) {
+          StringUtils.stringifyException(ie);
+          LOG.warn(ie);
+          String errMsg = "Fsck on path " + pmap.get("path") + " failed.";
+          response.sendError(HttpServletResponse.SC_GONE, errMsg);
+          throw ie;
+        }
+      }
+    }
 }
 }

+ 2 - 4
src/java/org/apache/hadoop/dfs/NameNode.java

@@ -23,8 +23,6 @@ import org.apache.hadoop.ipc.*;
 import org.apache.hadoop.conf.*;
 import org.apache.hadoop.conf.*;
 
 
 import java.io.*;
 import java.io.*;
-import java.net.InetAddress;
-import java.net.InetSocketAddress;
 
 
 import org.apache.hadoop.metrics.MetricsRecord;
 import org.apache.hadoop.metrics.MetricsRecord;
 import org.apache.hadoop.metrics.Metrics;
 import org.apache.hadoop.metrics.Metrics;
@@ -129,7 +127,7 @@ public class NameNode implements ClientProtocol, DatanodeProtocol, FSConstants {
      * Create a NameNode at the specified location and start it.
      * Create a NameNode at the specified location and start it.
      */
      */
     public NameNode(File dir, String bindAddress, int port, Configuration conf) throws IOException {
     public NameNode(File dir, String bindAddress, int port, Configuration conf) throws IOException {
-        this.namesystem = new FSNamesystem(dir, conf);
+        this.namesystem = new FSNamesystem(dir, this, conf);
         this.handlerCount = conf.getInt("dfs.namenode.handler.count", 10);
         this.handlerCount = conf.getInt("dfs.namenode.handler.count", 10);
         this.server = RPC.getServer(this, bindAddress, port, handlerCount, false, conf);
         this.server = RPC.getServer(this, bindAddress, port, handlerCount, false, conf);
         this.server.start();
         this.server.start();
@@ -546,7 +544,7 @@ public class NameNode implements ClientProtocol, DatanodeProtocol, FSConstants {
       if( version != DFS_CURRENT_VERSION )
       if( version != DFS_CURRENT_VERSION )
         throw new IncorrectVersionException( version, "data node" );
         throw new IncorrectVersionException( version, "data node" );
     }
     }
-
+    
     /**
     /**
      */
      */
     public static void main(String argv[]) throws Exception {
     public static void main(String argv[]) throws Exception {

+ 593 - 0
src/java/org/apache/hadoop/dfs/NamenodeFsck.java

@@ -0,0 +1,593 @@
+/**
+ * Copyright 2005 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.dfs;
+
+import java.io.BufferedInputStream;
+import java.io.BufferedOutputStream;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.io.PrintWriter;
+import java.net.InetSocketAddress;
+import java.net.Socket;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Random;
+import java.util.TreeSet;
+import javax.servlet.http.HttpServletResponse;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSOutputStream;
+import org.apache.hadoop.io.UTF8;
+
+
+/**
+ * This class provides rudimentary checking of DFS volumes for errors and
+ * sub-optimal conditions.
+ * <p>The tool scans all files and directories, starting from an indicated
+ *  root path. The following abnormal conditions are detected and handled:</p>
+ * <ul>
+ * <li>files with blocks that are completely missing from all datanodes.<br/>
+ * In this case the tool can perform one of the following actions:
+ *  <ul>
+ *      <li>none ({@link #FIXING_NONE})</li>
+ *      <li>move corrupted files to /lost+found directory on DFS
+ *      ({@link #FIXING_MOVE}). Remaining data blocks are saved as a
+ *      block chains, representing longest consecutive series of valid blocks.</li>
+ *      <li>delete corrupted files ({@link #FIXING_DELETE})</li>
+ *  </ul>
+ *  </li>
+ *  <li>detect files with under-replicated or over-replicated blocks</li>
+ *  </ul>
+ *  Additionally, the tool collects a detailed overall DFS statistics, and
+ *  optionally can print detailed statistics on block locations and replication
+ *  factors of each file.
+ *
+ * @author Andrzej Bialecki
+ */
+public class NamenodeFsck {
+  public static final Log LOG = LogFactory.getLog("org.apache.hadoop.dfs.NameNode");
+  
+  /** Don't attempt any fixing . */
+  public static final int FIXING_NONE = 0;
+  /** Move corrupted files to /lost+found . */
+  public static final int FIXING_MOVE = 1;
+  /** Delete corrupted files. */
+  public static final int FIXING_DELETE = 2;
+  
+  private NameNode nn;
+  private UTF8 lostFound = null;
+  private boolean lfInited = false;
+  private boolean lfInitedOk = false;
+  private boolean showFiles = false;
+  private boolean showBlocks = false;
+  private boolean showLocations = false;
+  private int fixing = FIXING_NONE;
+  private String path = "/";
+  
+  private Configuration conf;
+  private HttpServletResponse response;
+  private PrintWriter out;
+  
+  /**
+   * Filesystem checker.
+   * @param conf current Configuration
+   * @param fixing one of pre-defined values
+   * @param showFiles show each file being checked
+   * @param showBlocks for each file checked show its block information
+   * @param showLocations for each block in each file show block locations
+   * @throws Exception
+   */
+  public NamenodeFsck(Configuration conf,
+      NameNode nn,
+      Map<String,String[]> pmap,
+      HttpServletResponse response) throws IOException {
+    this.conf = conf;
+    this.nn = nn;
+    this.response = response;
+    this.out = response.getWriter();
+    for (Iterator<String> it = pmap.keySet().iterator(); it.hasNext();) {
+      String key = it.next();
+      if (key.equals("path")) { this.path = pmap.get("path")[0]; }
+      else if (key.equals("move")) { this.fixing = FIXING_MOVE; }
+      else if (key.equals("delete")) { this.fixing = FIXING_DELETE; }
+      else if (key.equals("files")) { this.showFiles = true; }
+      else if (key.equals("blocks")) { this.showBlocks = true; }
+      else if (key.equals("locations")) { this.showLocations = true; }
+    }
+  }
+  
+  /**
+   * Check files on DFS, starting from the indicated path.
+   * @throws Exception
+   */
+  public void fsck() throws IOException {
+    try {
+      DFSFileInfo[] files = nn.getListing(path);
+      FsckResult res = new FsckResult();
+      res.setReplication((short) conf.getInt("dfs.replication", 3));
+      if (files != null) {
+        for (int i = 0; i < files.length; i++) {
+          check(files[i], res);
+        }
+      }
+      out.println(res);
+      if (res.isHealthy()) {
+        out.println("\n\nThe filesystem under path '" + path + "' is HEALTHY");
+      }  else {
+        out.println("\n\nThe filesystem under path '" + path + "' is CORRUPT");
+      }
+    } finally {
+      out.close();
+    }
+  }
+  
+  private void check(DFSFileInfo file, FsckResult res) throws IOException {
+    if (file.isDir()) {
+      if (showFiles)
+        out.println(file.getPath() + " <dir>");
+      res.totalDirs++;
+      DFSFileInfo[] files = nn.getListing(file.getPath());
+      for (int i = 0; i < files.length; i++) {
+        check(files[i], res);
+      }
+      return;
+    }
+    res.totalFiles++;
+    res.totalSize += file.getLen();
+    LocatedBlock[] blocks = nn.open(file.getPath());
+    res.totalBlocks += blocks.length;
+    if (showFiles) {
+      out.print(file.getPath() + " " + file.getLen() + ", " + blocks.length + " block(s): ");
+    }  else {
+      out.print('.');
+      out.flush();
+      if (res.totalFiles % 100 == 0)        out.println();
+    }
+    int missing = 0;
+    long missize = 0;
+    StringBuffer report = new StringBuffer();
+    for (int i = 0; i < blocks.length; i++) {
+      Block block = blocks[i].getBlock();
+      long id = block.getBlockId();
+      DatanodeInfo[] locs = blocks[i].getLocations();
+      short targetFileReplication = file.getReplication();
+      if (locs.length > targetFileReplication) res.overReplicatedBlocks += (locs.length - targetFileReplication);
+      if (locs.length < targetFileReplication && locs.length > 0) res.underReplicatedBlocks += (targetFileReplication - locs.length);
+      report.append(i + ". " + id + " len=" + block.getNumBytes());
+      if (locs == null || locs.length == 0) {
+        report.append(" MISSING!");
+        res.addMissing(block.getBlockName(), block.getNumBytes());
+        missing++;
+        missize += block.getNumBytes();
+      } else {
+        report.append(" repl=" + locs.length);
+        if (showLocations) {
+          StringBuffer sb = new StringBuffer("[");
+          for (int j = 0; j < locs.length; j++) {
+            if (j > 0) sb.append(", ");
+            sb.append(locs[j]);
+          }
+          sb.append(']');
+          report.append(" " + sb.toString());
+        }
+      }
+      report.append('\n');
+    }
+    if (missing > 0) {
+      if (!showFiles)
+        out.println("\nMISSING " + missing + " blocks of total size " + missize + " B");
+      res.corruptFiles++;
+      switch(fixing) {
+        case FIXING_NONE:
+          break;
+        case FIXING_MOVE:
+          lostFoundMove(file, blocks);
+          break;
+        case FIXING_DELETE:
+          nn.delete(file.getPath());
+      }
+    }
+    if (showFiles) {
+      if (missing > 0) {
+        out.println(" MISSING " + missing + " blocks of total size " + missize + " B");
+      }  else        out.println(" OK");
+      if (showBlocks)        out.println(report.toString());
+    }
+  }
+  
+  private void lostFoundMove(DFSFileInfo file, LocatedBlock[] blocks)
+  throws IOException {
+    DFSClient dfs = new DFSClient(DataNode.createSocketAddr(
+        conf.get("fs.default.name", "local")), conf);
+    if (!lfInited) {
+      lostFoundInit(dfs);
+    }
+    if (!lfInitedOk) {
+      return;
+    }
+    String target = lostFound.toString() + file.getPath();
+    String errmsg = "Failed to move " + file.getPath() + " to /lost+found";
+    try {
+      if (!nn.mkdirs(target)) {
+        LOG.warn(errmsg);
+        return;
+      }
+      // create chains
+      int chain = 0;
+      FSOutputStream fos = null;
+      for (int i = 0; i < blocks.length; i++) {
+        LocatedBlock lblock = blocks[i];
+        DatanodeInfo[] locs = lblock.getLocations();
+        if (locs == null || locs.length == 0) {
+          if (fos != null) {
+            fos.flush();
+            fos.close();
+            fos = null;
+          }
+          continue;
+        }
+        if (fos == null) {
+          fos = dfs.create(new UTF8(target.toString() + "/" + chain), true);
+          if (fos != null) chain++;
+        }
+        if (fos == null) {
+          LOG.warn(errmsg + ": could not store chain " + chain);
+          // perhaps we should bail out here...
+          // return;
+          continue;
+        }
+        
+        // copy the block. It's a pity it's not abstracted from DFSInputStream ...
+        try {
+          copyBlock(dfs, lblock, fos);
+        } catch (Exception e) {
+          e.printStackTrace();
+          // something went wrong copying this block...
+          LOG.warn(" - could not copy block " + lblock.getBlock().getBlockName() + " to " + target);
+          fos.flush();
+          fos.close();
+          fos = null;
+        }
+      }
+      if (fos != null) fos.close();
+      LOG.warn("\n - moved corrupted file " + file.getPath() + " to /lost+found");
+      dfs.delete(new UTF8(file.getPath()));
+    }  catch (Exception e) {
+      e.printStackTrace();
+      LOG.warn(errmsg + ": " + e.getMessage());
+    }
+  }
+      
+  /*
+   * XXX (ab) Bulk of this method is copied verbatim from {@link DFSClient}, which is
+   * bad. Both places should be refactored to provide a method to copy blocks
+   * around.
+   */
+      private void copyBlock(DFSClient dfs, LocatedBlock lblock,
+          FSOutputStream fos) throws Exception {
+    int failures = 0;
+    InetSocketAddress targetAddr = null;
+    TreeSet deadNodes = new TreeSet();
+    Socket s = null;
+    DataInputStream in = null;
+    DataOutputStream out = null;
+    while (s == null) {
+      DatanodeInfo chosenNode;
+      
+      try {
+        chosenNode = bestNode(dfs, lblock.getLocations(), deadNodes);
+        targetAddr = DataNode.createSocketAddr(chosenNode.getName());
+      }  catch (IOException ie) {
+        if (failures >= DFSClient.MAX_BLOCK_ACQUIRE_FAILURES) {
+          throw new IOException("Could not obtain block " + lblock);
+        }
+        LOG.info("Could not obtain block from any node:  " + ie);
+        try {
+          Thread.sleep(10000);
+        }  catch (InterruptedException iex) {
+        }
+        deadNodes.clear();
+        failures++;
+        continue;
+      }
+      try {
+        s = new Socket();
+        s.connect(targetAddr, FSConstants.READ_TIMEOUT);
+        s.setSoTimeout(FSConstants.READ_TIMEOUT);
+        
+        //
+        // Xmit header info to datanode
+        //
+        out = new DataOutputStream(new BufferedOutputStream(s.getOutputStream()));
+        out.write(FSConstants.OP_READSKIP_BLOCK);
+        lblock.getBlock().write(out);
+        out.writeLong(0L);
+        out.flush();
+        
+        //
+        // Get bytes in block, set streams
+        //
+        in = new DataInputStream(new BufferedInputStream(s.getInputStream()));
+        long curBlockSize = in.readLong();
+        long amtSkipped = in.readLong();
+        if (curBlockSize != lblock.getBlock().len) {
+          throw new IOException("Recorded block size is " + lblock.getBlock().len + ", but datanode reports size of " + curBlockSize);
+        }
+        if (amtSkipped != 0L) {
+          throw new IOException("Asked for offset of " + 0L + ", but only received offset of " + amtSkipped);
+        }
+      }  catch (IOException ex) {
+        // Put chosen node into dead list, continue
+        LOG.info("Failed to connect to " + targetAddr + ":" + ex);
+        deadNodes.add(chosenNode);
+        if (s != null) {
+          try {
+            s.close();
+          } catch (IOException iex) {
+          }
+        }
+        s = null;
+      }
+    }
+    if (in == null) {
+      throw new Exception("Could not open data stream for " + lblock.getBlock().getBlockName());
+    }
+    byte[] buf = new byte[1024];
+    int cnt = 0;
+    boolean success = true;
+    try {
+      while ((cnt = in.read(buf)) > 0) {
+        fos.write(buf, 0, cnt);
+      }
+    } catch (Exception e) {
+      e.printStackTrace();
+      success = false;
+    } finally {
+      try {in.close(); } catch (Exception e1) {}
+      try {out.close(); } catch (Exception e1) {}
+      try {s.close(); } catch (Exception e1) {}
+    }
+    if (!success)
+      throw new Exception("Could not copy block data for " + lblock.getBlock().getBlockName());
+  }
+      
+  /*
+   * XXX (ab) See comment above for copyBlock().
+   *
+   * Pick the best node from which to stream the data.
+   * That's the local one, if available.
+   */
+      Random r = new Random();
+  private DatanodeInfo bestNode(DFSClient dfs, DatanodeInfo[] nodes,
+      TreeSet deadNodes) throws IOException {
+    if ((nodes == null) ||
+            (nodes.length - deadNodes.size() < 1)) {
+      throw new IOException("No live nodes contain current block");
+    }
+    DatanodeInfo chosenNode = null;
+    for (int i = 0; i < nodes.length; i++) {
+      if (deadNodes.contains(nodes[i])) {
+        continue;
+      }
+      String nodename = nodes[i].getName();
+      int colon = nodename.indexOf(':');
+      if (colon >= 0) {
+        nodename = nodename.substring(0, colon);
+      }
+      if (dfs.localName.equals(nodename)) {
+        chosenNode = nodes[i];
+        break;
+      }
+    }
+    if (chosenNode == null) {
+      do  {
+        chosenNode = nodes[Math.abs(r.nextInt())  % nodes.length];
+      } while (deadNodes.contains(chosenNode));
+    }
+    return chosenNode;
+  }
+  
+  private void lostFoundInit(DFSClient dfs) {
+    lfInited = true;
+    try {
+      UTF8 lfName = new UTF8("/lost+found");
+      // check that /lost+found exists
+      if (!dfs.exists(lfName)) {
+        lfInitedOk = dfs.mkdirs(lfName);
+        lostFound = lfName;
+      } else        if (!dfs.isDirectory(lfName)) {
+          LOG.warn("Cannot use /lost+found : a regular file with this name exists.");
+          lfInitedOk = false;
+        }  else { // exists and isDirectory
+          lostFound = lfName;
+          lfInitedOk = true;
+        }
+    }  catch (Exception e) {
+      e.printStackTrace();
+      lfInitedOk = false;
+    }
+    if (lostFound == null) {
+      LOG.warn("Cannot initialize /lost+found .");
+      lfInitedOk = false;
+    }
+  }
+  
+  /**
+   * @param args
+   */
+  public int run(String[] args) throws Exception {
+    
+    return 0;
+  }
+  
+  /**
+   * FsckResult of checking, plus overall DFS statistics.
+   *
+   * @author Andrzej Bialecki
+   */
+  public class FsckResult {
+    private ArrayList missingIds = new ArrayList();
+    private long missingSize = 0L;
+    private long corruptFiles = 0L;
+    private long overReplicatedBlocks = 0L;
+    private long underReplicatedBlocks = 0L;
+    private int replication = 0;
+    private long totalBlocks = 0L;
+    private long totalFiles = 0L;
+    private long totalDirs = 0L;
+    private long totalSize = 0L;
+    
+    /**
+     * DFS is considered healthy if there are no missing blocks.
+     * @return
+     */
+    public boolean isHealthy() {
+      return missingIds.size() == 0;
+    }
+    
+    /** Add a missing block name, plus its size. */
+    public void addMissing(String id, long size) {
+      missingIds.add(id);
+      missingSize += size;
+    }
+    
+    /** Return a list of missing block names (as list of Strings). */
+    public ArrayList getMissingIds() {
+      return missingIds;
+    }
+    
+    /** Return total size of missing data, in bytes. */
+    public long getMissingSize() {
+      return missingSize;
+    }
+    
+    public void setMissingSize(long missingSize) {
+      this.missingSize = missingSize;
+    }
+    
+    /** Return the number of over-replicsted blocks. */
+    public long getOverReplicatedBlocks() {
+      return overReplicatedBlocks;
+    }
+    
+    public void setOverReplicatedBlocks(long overReplicatedBlocks) {
+      this.overReplicatedBlocks = overReplicatedBlocks;
+    }
+    
+    /** Return the actual replication factor. */
+    public float getReplicationFactor() {
+      if (totalBlocks != 0)
+        return (float) (totalBlocks * replication + overReplicatedBlocks - underReplicatedBlocks) / (float) totalBlocks;
+      else
+        return 0.0f;
+    }
+    
+    /** Return the number of under-replicated blocks. Note: missing blocks are not counted here.*/
+    public long getUnderReplicatedBlocks() {
+      return underReplicatedBlocks;
+    }
+    
+    public void setUnderReplicatedBlocks(long underReplicatedBlocks) {
+      this.underReplicatedBlocks = underReplicatedBlocks;
+    }
+    
+    /** Return total number of directories encountered during this scan. */
+    public long getTotalDirs() {
+      return totalDirs;
+    }
+    
+    public void setTotalDirs(long totalDirs) {
+      this.totalDirs = totalDirs;
+    }
+    
+    /** Return total number of files encountered during this scan. */
+    public long getTotalFiles() {
+      return totalFiles;
+    }
+    
+    public void setTotalFiles(long totalFiles) {
+      this.totalFiles = totalFiles;
+    }
+    
+    /** Return total size of scanned data, in bytes. */
+    public long getTotalSize() {
+      return totalSize;
+    }
+    
+    public void setTotalSize(long totalSize) {
+      this.totalSize = totalSize;
+    }
+    
+    /** Return the intended replication factor, against which the over/under-
+     * replicated blocks are counted. Note: this values comes from the current
+     * Configuration supplied for the tool, so it may be different from the
+     * value in DFS Configuration.
+     */
+    public int getReplication() {
+      return replication;
+    }
+    
+    public void setReplication(int replication) {
+      this.replication = replication;
+    }
+    
+    /** Return the total number of blocks in the scanned area. */
+    public long getTotalBlocks() {
+      return totalBlocks;
+    }
+    
+    public void setTotalBlocks(long totalBlocks) {
+      this.totalBlocks = totalBlocks;
+    }
+    
+    public String toString() {
+      StringBuffer res = new StringBuffer();
+      res.append("Status: " + (isHealthy() ? "HEALTHY" : "CORRUPT"));
+      res.append("\n Total size:\t" + totalSize + " B");
+      res.append("\n Total blocks:\t" + totalBlocks);
+      if (totalBlocks > 0) res.append(" (avg. block size "
+          + (totalSize / totalBlocks) + " B)");
+      res.append("\n Total dirs:\t" + totalDirs);
+      res.append("\n Total files:\t" + totalFiles);
+      if (missingSize > 0) {
+        res.append("\n  ********************************");
+        res.append("\n  CORRUPT FILES:\t" + corruptFiles);
+        res.append("\n  MISSING BLOCKS:\t" + missingIds.size());
+        res.append("\n  MISSING SIZE:\t\t" + missingSize + " B");
+        res.append("\n  ********************************");
+      }
+      res.append("\n Over-replicated blocks:\t" + overReplicatedBlocks);
+      if (totalBlocks > 0)        res.append(" (" + ((float) (overReplicatedBlocks * 100) / (float) totalBlocks) + " %)");
+      res.append("\n Under-replicated blocks:\t" + underReplicatedBlocks);
+      if (totalBlocks > 0)        res.append(" (" + ((float) (underReplicatedBlocks * 100) / (float) totalBlocks) + " %)");
+      res.append("\n Target replication factor:\t" + replication);
+      res.append("\n Real replication factor:\t" + getReplicationFactor());
+      return res.toString();
+    }
+    
+    /** Return the number of currupted files. */
+    public long getCorruptFiles() {
+      return corruptFiles;
+    }
+    
+    public void setCorruptFiles(long corruptFiles) {
+      this.corruptFiles = corruptFiles;
+    }
+  }
+}

+ 148 - 0
src/test/org/apache/hadoop/dfs/TestFsck.java

@@ -0,0 +1,148 @@
+/**
+ * Copyright 2005 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.dfs;
+
+import java.io.IOException;
+import java.util.Random;
+import junit.framework.*;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.dfs.MiniDFSCluster;
+import org.apache.hadoop.fs.*;
+import org.apache.hadoop.util.CopyFiles;
+
+
+/**
+ * A JUnit test for doing fsck
+ *
+ * @author Milind Bhandarkar
+ */
+public class TestFsck extends TestCase {
+  
+  private static final int NFILES = 20;
+  private static String TEST_ROOT_DIR =
+    new Path(System.getProperty("test.build.data","/tmp"))
+    .toString().replace(' ', '+');
+
+  /** class MyFile contains enough information to recreate the contents of
+   * a single file.
+   */
+  private static class MyFile {
+    private static Random gen = new Random();
+    private static final int MAX_LEVELS = 3;
+    private static final int MAX_SIZE = 8*1024;
+    private static String[] dirNames = {
+      "zero", "one", "two", "three", "four", "five", "six", "seven", "eight", "nine"
+    };
+    private String name = "";
+    private int size;
+    private long seed;
+    
+    MyFile() {
+      int nLevels = gen.nextInt(MAX_LEVELS);
+      if(nLevels != 0) {
+        int[] levels = new int[nLevels];
+        for (int idx = 0; idx < nLevels; idx++) {
+          levels[idx] = gen.nextInt(10);
+        }
+        StringBuffer sb = new StringBuffer();
+        for (int idx = 0; idx < nLevels; idx++) {
+          sb.append(dirNames[levels[idx]]);
+          sb.append("/");
+        }
+        name = sb.toString();
+      }
+      long fidx = -1;
+      while (fidx < 0) { fidx = gen.nextLong(); }
+      name = name + Long.toString(fidx);
+      size = gen.nextInt(MAX_SIZE);
+      seed = gen.nextLong();
+    }
+    
+    String getName() { return name; }
+    int getSize() { return size; }
+    long getSeed() { return seed; }
+  }
+  
+  public TestFsck(String testName) {
+    super(testName);
+  }
+
+  
+  
+  protected void setUp() throws Exception {
+  }
+
+  protected void tearDown() throws Exception {
+  }
+  
+  /** create NFILES with random names and directory hierarchies
+   * with random (but reproducible) data in them.
+   */
+  private static MyFile[] createFiles(String fsname, String topdir)
+  throws IOException {
+    MyFile[] files = new MyFile[NFILES];
+    
+    for (int idx = 0; idx < NFILES; idx++) {
+      files[idx] = new MyFile();
+    }
+    
+    Configuration conf = new Configuration();
+    FileSystem fs = FileSystem.getNamed(fsname, conf);
+    Path root = new Path(topdir);
+    
+    for (int idx = 0; idx < NFILES; idx++) {
+      Path fPath = new Path(root, files[idx].getName());
+      fs.mkdirs(fPath.getParent());
+      FSDataOutputStream out = fs.create(fPath);
+      byte[] toWrite = new byte[files[idx].getSize()];
+      Random rb = new Random(files[idx].getSeed());
+      rb.nextBytes(toWrite);
+      out.write(toWrite);
+      out.close();
+      toWrite = null;
+    }
+    
+    return files;
+  }
+  
+  /** delete directory and everything underneath it.*/
+  private static void deldir(String fsname, String topdir)
+  throws IOException {
+    Configuration conf = new Configuration();
+    FileSystem fs = FileSystem.getNamed(fsname, conf);
+    Path root = new Path(topdir);
+    fs.delete(root);
+  }
+  
+  /** do fsck */
+  public void testFsck() throws Exception {
+    String namenode = null;
+    MiniDFSCluster cluster = null;
+    try {
+      Configuration conf = new Configuration();
+      cluster = new MiniDFSCluster(65314, conf, false);
+      namenode = conf.get("fs.default.name", "local");
+      if (!"local".equals(namenode)) {
+        MyFile[] files = createFiles(namenode, "/srcdat");
+        assertEquals(0, new DFSck().doMain(conf, new String[] {"/"}));
+        deldir(namenode, "/srcdat");
+      }
+    } finally {
+      if (cluster != null) { cluster.shutdown(); }
+    }
+  }
+}

+ 6 - 5
src/test/org/apache/hadoop/fs/TestCopyFiles.java

@@ -159,8 +159,9 @@ public class TestCopyFiles extends TestCase {
   /** copy files from local file system to local file system */
   /** copy files from local file system to local file system */
   public void testCopyFromLocalToLocal() throws Exception {
   public void testCopyFromLocalToLocal() throws Exception {
     MyFile[] files = createFiles("local", TEST_ROOT_DIR+"/srcdat");
     MyFile[] files = createFiles("local", TEST_ROOT_DIR+"/srcdat");
-    CopyFiles.main(new String[] {"file://"+TEST_ROOT_DIR+"/srcdat",
-      "file://"+TEST_ROOT_DIR+"/destdat"});
+    new CopyFiles().doMain(new Configuration(),
+        new String[] {"file://"+TEST_ROOT_DIR+"/srcdat",
+          "file://"+TEST_ROOT_DIR+"/destdat"});
     assertTrue("Source and destination directories do not match.",
     assertTrue("Source and destination directories do not match.",
         checkFiles("local", TEST_ROOT_DIR+"/destdat", files));
         checkFiles("local", TEST_ROOT_DIR+"/destdat", files));
     deldir("local", TEST_ROOT_DIR+"/destdat");
     deldir("local", TEST_ROOT_DIR+"/destdat");
@@ -177,7 +178,7 @@ public class TestCopyFiles extends TestCase {
       namenode = conf.get("fs.default.name", "local");
       namenode = conf.get("fs.default.name", "local");
       if (!"local".equals(namenode)) {
       if (!"local".equals(namenode)) {
         MyFile[] files = createFiles(namenode, "/srcdat");
         MyFile[] files = createFiles(namenode, "/srcdat");
-        CopyFiles.main(new String[] {"dfs://"+namenode+"/srcdat",
+        new CopyFiles().doMain(conf, new String[] {"dfs://"+namenode+"/srcdat",
         "dfs://"+namenode+"/destdat"});
         "dfs://"+namenode+"/destdat"});
         assertTrue("Source and destination directories do not match.",
         assertTrue("Source and destination directories do not match.",
             checkFiles(namenode, "/destdat", files));
             checkFiles(namenode, "/destdat", files));
@@ -199,7 +200,7 @@ public class TestCopyFiles extends TestCase {
       namenode = conf.get("fs.default.name", "local");
       namenode = conf.get("fs.default.name", "local");
       if (!"local".equals(namenode)) {
       if (!"local".equals(namenode)) {
         MyFile[] files = createFiles("local", TEST_ROOT_DIR+"/srcdat");
         MyFile[] files = createFiles("local", TEST_ROOT_DIR+"/srcdat");
-        CopyFiles.main(new String[] {"file://"+TEST_ROOT_DIR+"/srcdat",
+        new CopyFiles().doMain(conf, new String[] {"file://"+TEST_ROOT_DIR+"/srcdat",
         "dfs://"+namenode+"/destdat"});
         "dfs://"+namenode+"/destdat"});
         assertTrue("Source and destination directories do not match.",
         assertTrue("Source and destination directories do not match.",
             checkFiles(namenode, "/destdat", files));
             checkFiles(namenode, "/destdat", files));
@@ -221,7 +222,7 @@ public class TestCopyFiles extends TestCase {
       namenode = conf.get("fs.default.name", "local");
       namenode = conf.get("fs.default.name", "local");
       if (!"local".equals(namenode)) {
       if (!"local".equals(namenode)) {
         MyFile[] files = createFiles(namenode, "/srcdat");
         MyFile[] files = createFiles(namenode, "/srcdat");
-        CopyFiles.main(new String[] {"dfs://"+namenode+"/srcdat",
+        new CopyFiles().doMain(conf, new String[] {"dfs://"+namenode+"/srcdat",
         "file://"+TEST_ROOT_DIR+"/destdat"});
         "file://"+TEST_ROOT_DIR+"/destdat"});
         assertTrue("Source and destination directories do not match.",
         assertTrue("Source and destination directories do not match.",
             checkFiles("local", TEST_ROOT_DIR+"/destdat", files));
             checkFiles("local", TEST_ROOT_DIR+"/destdat", files));