Просмотр исходного кода

Merge -r 804755:805652 from trunk to move all the changes to the append branch.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/hdfs/branches/HDFS-265@805987 13f79535-47bb-0310-9956-ffa450edef68
Hairong Kuang 16 лет назад
Родитель
Сommit
99a2267e7b
23 измененных файлов с 798 добавлено и 104 удалено
  1. 19 9
      CHANGES.txt
  2. BIN
      lib/hadoop-core-0.21.0-dev.jar
  3. BIN
      lib/hadoop-core-test-0.21.0-dev.jar
  4. BIN
      lib/hadoop-mapred-0.21.0-dev.jar
  5. BIN
      lib/hadoop-mapred-examples-0.21.0-dev.jar
  6. BIN
      lib/hadoop-mapred-test-0.21.0-dev.jar
  7. 1 0
      src/java/org/apache/hadoop/hdfs/protocol/FSConstants.java
  8. 16 0
      src/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java
  9. 37 10
      src/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java
  10. 137 10
      src/java/org/apache/hadoop/hdfs/server/datanode/FSDataset.java
  11. 6 0
      src/java/org/apache/hadoop/hdfs/server/datanode/FSDatasetInterface.java
  12. 2 0
      src/java/org/apache/hadoop/hdfs/server/namenode/NameNode.java
  13. 2 1
      src/java/org/apache/hadoop/hdfs/server/protocol/DatanodeProtocol.java
  14. 50 57
      src/test/aop/org/apache/hadoop/fi/DataTransferTestUtil.java
  15. 52 0
      src/test/aop/org/apache/hadoop/fi/Pipeline.java
  16. 28 0
      src/test/aop/org/apache/hadoop/fi/PipelineTest.java
  17. 6 3
      src/test/aop/org/apache/hadoop/hdfs/protocol/ClientProtocolAspects.aj
  18. 5 2
      src/test/aop/org/apache/hadoop/hdfs/server/datanode/BlockReceiverAspects.aj
  19. 7 3
      src/test/aop/org/apache/hadoop/hdfs/server/datanode/DataTransferProtocolAspects.aj
  20. 56 9
      src/test/aop/org/apache/hadoop/hdfs/server/datanode/TestFiDataTransferProtocol.java
  21. 4 0
      src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java
  22. 369 0
      src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/TestDataNodeVolumeFailure.java
  23. 1 0
      src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/TestFsck.java

+ 19 - 9
CHANGES.txt

@@ -49,7 +49,7 @@ Trunk (unreleased changes)
     via szetszwo)
 
     HDFS-493. Change build.xml so that the fault-injected tests are executed
-    only by the run-test-*-faul-inject targets.  (Konstantin Boudnik via
+    only by the run-test-*-fault-inject targets.  (Konstantin Boudnik via
     szetszwo)
 
     HDFS-446. Improvements to Offline Image Viewer. (Jakob Homan via shv)
@@ -83,20 +83,28 @@ Trunk (unreleased changes)
 
     HDFS-524. Further DataTransferProtocol code refactoring.  (szetszwo)
 
-    HDFS-527. Remove/deprecate unnecessary DFSClient constructors.  (szetszwo)
-
     HDFS-529. Use BlockInfo instead of Block to avoid redundant block searches
     in BlockManager. (shv)
 
     HDFS-530. Refactor TestFileAppend* to remove code duplication.
     (Konstantin Boudnik via szetszwo)
 
-    HDFS-451. Add fault injection tests, Pipeline_Fi_06,07,14,15, for
-    DataTransferProtocol.  (szetszwo)
+    HDFS-451. Add fault injection tests for DataTransferProtocol.  (szetszwo)
 
     HDFS-509. Redesign DataNode volumeMap to include all types of Replicas.
     (hairong)
     
+    HDFS-457. Do not shutdown datanode if some, but not all, volumes fail.
+    (Boris Shkolnik via szetszwo)
+
+    HDFS-548. TestFsck takes nearly 10 minutes to run. (hairong)
+
+    HDFS-539. Refactor fault injeciton pipeline test util for future reuse.
+    (Konstantin Boudnik via szetszwo)
+
+    HDFS-552. Change TestFiDataTransferProtocol to junit 4 and add a few new
+    tests.  (szetszwo)
+
   BUG FIXES
 
     HDFS-76. Better error message to users when commands fail because of 
@@ -109,7 +117,7 @@ Trunk (unreleased changes)
     HADOOP-6096. Fix Eclipse project and classpath files following project
     split. (tomwhite)
 
-    HDFS-195. Handle expired tokens when write pipeline is restablished.
+    HDFS-195. Handle expired tokens when write pipeline is reestablished.
     (Kan Zhang via rangadi)
 
     HDFS-181. Validate src path in FSNamesystem.getFileInfo(..).  (Todd
@@ -148,9 +156,6 @@ Trunk (unreleased changes)
     HDFS-119. Fix a bug in logSync(), which causes NameNode block forever.
     (Suresh Srinivas via shv)
 
-    HDFS-167. Fix a bug in DFSClient that caused infinite retries on write.
-    (Bill Zeller via szetszwo)
-
     HDFS-534. Include avro in ivy.  (szetszwo)
 
 Release 0.20.1 - Unreleased
@@ -163,3 +168,8 @@ Release 0.20.1 - Unreleased
 
     HDFS-525. The SimpleDateFormat object in ListPathsServlet is not thread
     safe.  (Suresh Srinivas via szetszwo)
+
+    HDFS-167. Fix a bug in DFSClient that caused infinite retries on write.
+    (Bill Zeller via szetszwo)
+
+    HDFS-527. Remove/deprecate unnecessary DFSClient constructors.  (szetszwo)

BIN
lib/hadoop-core-0.21.0-dev.jar


BIN
lib/hadoop-core-test-0.21.0-dev.jar


BIN
lib/hadoop-mapred-0.21.0-dev.jar


BIN
lib/hadoop-mapred-examples-0.21.0-dev.jar


BIN
lib/hadoop-mapred-test-0.21.0-dev.jar


+ 1 - 0
src/java/org/apache/hadoop/hdfs/protocol/FSConstants.java

@@ -56,6 +56,7 @@ public interface FSConstants {
   public static final int DEFAULT_DATA_SOCKET_SIZE = 128 * 1024;
 
   public static final int SIZE_OF_INTEGER = Integer.SIZE / Byte.SIZE;
+  public static final int MIN_NUM_OF_VALID_VOLUMES = 1;// for a DN to run
 
   // SafeMode actions
   public enum SafeModeAction{ SAFEMODE_LEAVE, SAFEMODE_ENTER, SAFEMODE_GET; }

+ 16 - 0
src/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java

@@ -119,6 +119,9 @@ class BlockReceiver implements java.io.Closeable, FSConstants {
       
       // check if there is a disk error
       IOException cause = FSDataset.getCauseIfDiskError(ioe);
+      DataNode.LOG.warn("IOException in BlockReceiver constructor. Cause is ",
+          cause);
+      
       if (cause != null) { // possible disk error
         ioe = cause;
         datanode.checkDiskError(ioe); // may throw an exception here
@@ -833,7 +836,14 @@ class BlockReceiver implements java.io.Closeable, FSConstants {
             SUCCESS.write(replyOut);
             replyOut.flush();
         } catch (Exception e) {
+          LOG.warn("IOException in BlockReceiver.lastNodeRun: ", e);
           if (running) {
+            try {
+              datanode.checkDiskError(e); // may throw an exception here
+            } catch (IOException ioe) {
+              LOG.warn("DataNode.chekDiskError failed in lastDataNodeRun with: ",
+                  ioe);
+            }
             LOG.info("PacketResponder " + block + " " + numTargets + 
                      " Exception " + StringUtils.stringifyException(e));
             running = false;
@@ -993,7 +1003,13 @@ class BlockReceiver implements java.io.Closeable, FSConstants {
               running = false;
             }
         } catch (IOException e) {
+          LOG.warn("IOException in BlockReceiver.run(): ", e);
           if (running) {
+            try {
+              datanode.checkDiskError(e); // may throw an exception here
+            } catch (IOException ioe) {
+              LOG.warn("DataNode.chekDiskError failed in run() with: ", ioe);
+            }
             LOG.info("PacketResponder " + block + " " + numTargets + 
                      " Exception " + StringUtils.stringifyException(e));
             running = false;

+ 37 - 10
src/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java

@@ -694,11 +694,14 @@ public class DataNode extends Configured
   }
   
   
-  /* Check if there is no space in disk or the disk is read-only
-   *  when IOException occurs. 
-   * If so, handle the error */
-  protected void checkDiskError( IOException e ) throws IOException {
-    if (e.getMessage() != null && 
+  /** Check if there is no space in disk 
+   *  @param e that caused this checkDiskError call
+   **/
+  protected void checkDiskError(Exception e ) throws IOException {
+    
+    LOG.warn("checkDiskError: exception: ", e);
+    
+    if (e.getMessage() != null &&
         e.getMessage().startsWith("No space left on device")) {
       throw new DiskOutOfSpaceException("No space left on device");
     } else {
@@ -706,8 +709,11 @@ public class DataNode extends Configured
     }
   }
   
-  /* Check if there is no disk space and if so, handle the error*/
-  protected void checkDiskError( ) throws IOException {
+  /**
+   *  Check if there is a disk failure and if so, handle the error
+   *
+   **/
+  protected void checkDiskError( ) {
     try {
       data.checkDataDir();
     } catch(DiskErrorException de) {
@@ -716,13 +722,31 @@ public class DataNode extends Configured
   }
   
   private void handleDiskError(String errMsgr) {
-    LOG.warn("DataNode is shutting down.\n" + errMsgr);
-    shouldRun = false;
+    boolean hasEnoughResource = data.hasEnoughResource();
+    LOG.warn("DataNode.handleDiskError: Keep Running: " + hasEnoughResource);
+    
+    //if hasEnoughtResource = true - more volumes are available, so we don't want 
+    // to shutdown DN completely and don't want NN to remove it.
+    int dp_error = DatanodeProtocol.DISK_ERROR;
+    if(hasEnoughResource == false) {
+      // DN will be shutdown and NN should remove it
+      dp_error = DatanodeProtocol.FATAL_DISK_ERROR;
+    }
+    //inform NameNode
     try {
       namenode.errorReport(
-                           dnRegistration, DatanodeProtocol.DISK_ERROR, errMsgr);
+                           dnRegistration, dp_error, errMsgr);
     } catch(IOException ignored) {              
     }
+    
+    
+    if(hasEnoughResource) {
+      scheduleBlockReport(0);
+      return; // do not shutdown
+    }
+    
+    LOG.warn("DataNode is shutting down.\n" + errMsgr);
+    shouldRun = false; 
   }
     
   /** Number of concurrent xceivers per node. */
@@ -1238,6 +1262,9 @@ public class DataNode extends Configured
       } catch (IOException ie) {
         LOG.warn(dnRegistration + ":Failed to transfer " + b + " to " + targets[0].getName()
             + " got " + StringUtils.stringifyException(ie));
+        // check if there are any disk problem
+        datanode.checkDiskError();
+        
       } finally {
         xmitsInProgress.getAndDecrement();
         IOUtils.closeStream(blockSender);

+ 137 - 10
src/java/org/apache/hadoop/hdfs/server/datanode/FSDataset.java

@@ -17,18 +17,35 @@
  */
 package org.apache.hadoop.hdfs.server.datanode;
 
-import java.io.*;
-import java.util.*;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileOutputStream;
+import java.io.FilenameFilter;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.RandomAccessFile;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Random;
 
 import javax.management.NotCompliantMBeanException;
 import javax.management.ObjectName;
 import javax.management.StandardMBean;
 
-import org.apache.hadoop.fs.*;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.DF;
+import org.apache.hadoop.fs.DU;
+import org.apache.hadoop.fs.FileUtil;
 import org.apache.hadoop.hdfs.protocol.Block;
 import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
 import org.apache.hadoop.hdfs.protocol.FSConstants;
 import org.apache.hadoop.hdfs.protocol.LocatedBlock;
+import org.apache.hadoop.hdfs.server.datanode.metrics.FSDatasetMBean;
+import org.apache.hadoop.hdfs.server.protocol.InterDatanodeProtocol;
 import org.apache.hadoop.metrics.util.MBeanUtil;
 import org.apache.hadoop.util.DataChecksum;
 import org.apache.hadoop.util.DiskChecker;
@@ -40,6 +57,8 @@ import org.apache.hadoop.hdfs.server.common.HdfsConstants.ReplicaState;
 import org.apache.hadoop.hdfs.server.datanode.metrics.FSDatasetMBean;
 import org.apache.hadoop.hdfs.server.protocol.InterDatanodeProtocol;
 
+import org.mortbay.log.Log;
+
 /**************************************************
  * FSDataset manages a set of data blocks.  Each block
  * has a unique name and an extent on disk.
@@ -468,9 +487,25 @@ public class FSDataset implements FSConstants, FSDatasetInterface {
     FSVolumeSet(FSVolume[] volumes) {
       this.volumes = volumes;
     }
+    
+    private int numberOfVolumes() {
+      return volumes.length;
+    }
       
     synchronized FSVolume getNextVolume(long blockSize) throws IOException {
+      
+      if(volumes.length < 1) {
+        throw new DiskOutOfSpaceException("No more available volumes");
+      }
+      
+      // since volumes could've been removed because of the failure
+      // make sure we are not out of bounds
+      if(curVolume >= volumes.length) {
+        curVolume = 0;
+      }
+      
       int startVolume = curVolume;
+      
       while (true) {
         FSVolume volume = volumes[curVolume];
         curVolume = (curVolume + 1) % volumes.length;
@@ -511,10 +546,46 @@ public class FSDataset implements FSConstants, FSDatasetInterface {
       }
     }
       
-    synchronized void checkDirs() throws DiskErrorException {
+    /**
+     * goes over all the volumes and checkDir eachone of them
+     * if one throws DiskErrorException - removes from the list of active 
+     * volumes. 
+     * @return list of all the removed volumes
+     */
+    synchronized List<FSVolume> checkDirs() {
+      
+      ArrayList<FSVolume> removed_vols = null;  
+      
       for (int idx = 0; idx < volumes.length; idx++) {
-        volumes[idx].checkDirs();
+        FSVolume fsv = volumes[idx];
+        try {
+          fsv.checkDirs();
+        } catch (DiskErrorException e) {
+          DataNode.LOG.warn("Removing failed volume " + fsv + ": ",e);
+          if(removed_vols == null) {
+            removed_vols = new ArrayList<FSVolume>(1);
+          }
+          removed_vols.add(volumes[idx]);
+          volumes[idx] = null; //remove the volume
+        }
+      }
+      
+      // repair array - copy non null elements
+      int removed_size = (removed_vols==null)? 0 : removed_vols.size();
+      if(removed_size > 0) {
+        FSVolume fsvs[] = new FSVolume [volumes.length-removed_size];
+        for(int idx=0,idy=0; idx<volumes.length; idx++) {
+          if(volumes[idx] != null) {
+            fsvs[idy] = volumes[idx];
+            idy++;
+          }
+        }
+        volumes = fsvs; // replace array of volumes
       }
+      Log.info("Completed FSVolumeSet.checkDirs. Removed=" + removed_size + 
+          "volumes. List of current volumes: " +   toString());
+      
+      return removed_vols;
     }
       
     public String toString() {
@@ -656,7 +727,14 @@ public class FSDataset implements FSConstants, FSDatasetInterface {
       return volumes.getDfsUsed();
     }
   }
-  
+  /**
+   * Return true - if there are still valid volumes 
+   * on the DataNode
+   */
+  public boolean hasEnoughResource(){
+    return volumes.numberOfVolumes() >= MIN_NUM_OF_VALID_VOLUMES;
+  }
+
   /**
    * Return total capacity, used and unused
    */
@@ -1226,11 +1304,19 @@ public class FSDataset implements FSConstants, FSDatasetInterface {
   /**
    * Find the file corresponding to the block and return it if it exists.
    */
-  File validateBlockFile(Block b) {
+  File validateBlockFile(Block b) throws IOException {
     //Should we check for metadata file too?
     File f = getFile(b);
-    if(f != null && f.exists())
-      return f;
+    
+    if(f != null ) {
+      if(f.exists())
+        return f;
+   
+      // if file is not null, but doesn't exist - possibly disk failed
+      DataNode datanode = DataNode.getDataNode();
+      datanode.checkDiskError();
+    }
+    
     if (InterDatanodeProtocol.LOG.isDebugEnabled()) {
       InterDatanodeProtocol.LOG.debug("b=" + b + ", f=" + f);
     }
@@ -1375,10 +1461,51 @@ public class FSDataset implements FSConstants, FSDatasetInterface {
   }
   /**
    * check if a data directory is healthy
+   * if some volumes failed - make sure to remove all the blocks that belong
+   * to these volumes
    * @throws DiskErrorException
    */
   public void checkDataDir() throws DiskErrorException {
-    volumes.checkDirs();
+    long total_blocks=0, removed_blocks=0;
+    List<FSVolume> failed_vols =  volumes.checkDirs();
+    
+    //if there no failed volumes return
+    if(failed_vols == null) 
+      return;
+    
+    // else 
+    // remove related blocks
+    long mlsec = System.currentTimeMillis();
+    synchronized (this) {
+      Iterator<ReplicaInfo> ib = volumeMap.replicas().iterator();
+      while(ib.hasNext()) {
+        ReplicaInfo b = ib.next();
+        total_blocks ++;
+        // check if the volume block belongs to still valid
+        FSVolume vol = b.getVolume();
+        for(FSVolume fv: failed_vols) {
+          if(vol == fv) {
+            DataNode.LOG.warn("removing block " + b.getBlockId() + " from vol " 
+                + vol.dataDir.dir.getAbsolutePath());
+            ib.remove();
+            removed_blocks++;
+            break;
+          }
+        }
+      }
+    } // end of sync
+    mlsec = System.currentTimeMillis() - mlsec;
+    DataNode.LOG.warn(">>>>>>>>>>>>Removed " + removed_blocks + " out of " + total_blocks +
+        "(took " + mlsec + " millisecs)");
+
+    // report the error
+    StringBuilder sb = new StringBuilder();
+    for(FSVolume fv : failed_vols) {
+      sb.append(fv.dataDir.dir.getAbsolutePath() + ";");
+    }
+
+    throw  new DiskErrorException("DataNode failed volumes:" + sb);
+  
   }
     
 

+ 6 - 0
src/java/org/apache/hadoop/hdfs/server/datanode/FSDatasetInterface.java

@@ -264,4 +264,10 @@ public interface FSDatasetInterface extends FSDatasetMBean {
    * @throws IOException
    */
   public void validateBlockMetadata(Block b) throws IOException;
+
+  /**
+   * checks how many valid storage volumes are there in the DataNode
+   * @return true if more then minimum valid volumes left in the FSDataSet
+   */
+  public boolean hasEnoughResource();
 }

+ 2 - 0
src/java/org/apache/hadoop/hdfs/server/namenode/NameNode.java

@@ -942,6 +942,8 @@ public class NameNode implements ClientProtocol, DatanodeProtocol,
     }
     verifyRequest(nodeReg);
     if (errorCode == DatanodeProtocol.DISK_ERROR) {
+      LOG.warn("Volume failed on " + dnName); 
+    } else if (errorCode == DatanodeProtocol.FATAL_DISK_ERROR) {
       namesystem.removeDatanode(nodeReg);            
     }
   }

+ 2 - 1
src/java/org/apache/hadoop/hdfs/server/protocol/DatanodeProtocol.java

@@ -42,8 +42,9 @@ public interface DatanodeProtocol extends VersionedProtocol {
   
   // error code
   final static int NOTIFY = 0;
-  final static int DISK_ERROR = 1;
+  final static int DISK_ERROR = 1; // there are still valid volumes on DN
   final static int INVALID_BLOCK = 2;
+  final static int FATAL_DISK_ERROR = 3; // no valid volumes left on DN
 
   /**
    * Determines actions that data node should perform 

+ 50 - 57
src/test/aop/org/apache/hadoop/fi/DataTransferTestUtil.java

@@ -17,88 +17,80 @@
  */
 package org.apache.hadoop.fi;
 
-import java.util.ArrayList;
-import java.util.List;
-
 import org.apache.hadoop.fi.FiTestUtil.Action;
 import org.apache.hadoop.fi.FiTestUtil.ActionContainer;
 import org.apache.hadoop.hdfs.protocol.DatanodeID;
 import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
 import org.apache.hadoop.hdfs.protocol.LocatedBlock;
-import org.apache.hadoop.hdfs.server.datanode.DataNode;
 import org.apache.hadoop.util.DiskChecker.DiskOutOfSpaceException;
 
+import java.util.ArrayList;
+import java.util.List;
+
 /**
  * Utilities for DataTransferProtocol related tests,
  * e.g. TestFiDataTransferProtocol.
  */
 public class DataTransferTestUtil {
-  private static DataTransferTest thepipelinetest;
+  protected static PipelineTest thepipelinetest;
   /** initialize pipeline test */
-  public static DataTransferTest initTest() {
+  public static PipelineTest initTest() {
     return thepipelinetest = new DataTransferTest();
   }
   /** get the pipeline test object */
-  public static DataTransferTest getPipelineTest() {
+  public static PipelineTest getPipelineTest() {
     return thepipelinetest;
   }
+  /** get the pipeline test object cast to DataTransferTest */
+  public static DataTransferTest getDataTransferTest() {
+    return (DataTransferTest)getPipelineTest();
+  }
 
   /**
    * The DataTransferTest class includes a pipeline
    * and some actions.
    */
-  public static class DataTransferTest {
-    private Pipeline thepipeline;
+  public static class DataTransferTest implements PipelineTest {
+    private List<Pipeline> pipelines = new ArrayList<Pipeline>();
+
     /** Simulate action for the receiverOpWriteBlock pointcut */
-    public final ActionContainer<DataNode> fiReceiverOpWriteBlock
-        = new ActionContainer<DataNode>();
+    public final ActionContainer<DatanodeID> fiReceiverOpWriteBlock
+        = new ActionContainer<DatanodeID>();
     /** Simulate action for the callReceivePacket pointcut */
-    public final ActionContainer<DataNode> fiCallReceivePacket
-        = new ActionContainer<DataNode>();
+    public final ActionContainer<DatanodeID> fiCallReceivePacket
+        = new ActionContainer<DatanodeID>();
     /** Simulate action for the statusRead pointcut */
-    public final ActionContainer<DataNode> fiStatusRead
-        = new ActionContainer<DataNode>();
+    public final ActionContainer<DatanodeID> fiStatusRead
+        = new ActionContainer<DatanodeID>();
 
     /** Initialize the pipeline. */
     public Pipeline initPipeline(LocatedBlock lb) {
-      if (thepipeline != null) {
+      final Pipeline pl = new Pipeline(lb);
+      if (pipelines.contains(pl)) {
         throw new IllegalStateException("thepipeline != null");
       }
-      return thepipeline = new Pipeline(lb);
+      pipelines.add(pl);
+      return pl;
     }
 
     /** Return the pipeline. */
-    public Pipeline getPipeline() {
-      if (thepipeline == null) {
+    public Pipeline getPipeline(DatanodeID id) {
+      if (pipelines == null) {
         throw new IllegalStateException("thepipeline == null");
       }
-      return thepipeline;
-    }
-  }
-
-  /** A pipeline contains a list of datanodes. */
-  public static class Pipeline {
-    private final List<String> datanodes = new ArrayList<String>();
-    
-    private Pipeline(LocatedBlock lb) {
-      for(DatanodeInfo d : lb.getLocations()) {
-        datanodes.add(d.getName());
+      StringBuilder dnString = new StringBuilder();
+      for (Pipeline pipeline : pipelines) {
+        for (DatanodeInfo dni : pipeline.getDataNodes())
+          dnString.append(dni.getStorageID());
+        if (dnString.toString().contains(id.getStorageID()))
+          return pipeline;
       }
-    }
-
-    /** Does the pipeline contains d at the n th position? */
-    public boolean contains(int n, DatanodeID d) {
-      return d.getName().equals(datanodes.get(n));
-    }
-
-    /** {@inheritDoc} */
-    public String toString() {
-      return getClass().getSimpleName() + datanodes;
+      return null;
     }
   }
 
   /** Action for DataNode */
-  public static abstract class DataNodeAction implements Action<DataNode> {
+  public static abstract class DataNodeAction implements Action<DatanodeID> {
     /** The name of the test */
     final String currentTest;
     /** The index of the datanode */
@@ -108,7 +100,7 @@ public class DataTransferTestUtil {
      * @param currentTest The name of the test
      * @param index The index of the datanode
      */
-    private DataNodeAction(String currentTest, int index) {
+    protected DataNodeAction(String currentTest, int index) {
       this.currentTest = currentTest;
       this.index = index;
     }
@@ -118,10 +110,11 @@ public class DataTransferTestUtil {
       return currentTest + ", index=" + index;
     }
 
-    /** {@inheritDoc} */
-    String toString(DataNode datanode) {
+    /** {@inheritDoc}
+     * @param datanodeID*/
+    String toString(DatanodeID datanodeID) {
       return "FI: " + this + ", datanode="
-          + datanode.getDatanodeRegistration().getName();
+          + datanodeID.getName();
     }
   }
 
@@ -133,10 +126,10 @@ public class DataTransferTestUtil {
     }
 
     @Override
-    public void run(DataNode datanode) {
-      final Pipeline p = getPipelineTest().getPipeline();
-      if (p.contains(index, datanode.getDatanodeRegistration())) {
-        final String s = toString(datanode);
+    public void run(DatanodeID id) {
+      final Pipeline p = getPipelineTest().getPipeline(id);
+      if (p.contains(index, id)) {
+        final String s = toString(id);
         FiTestUtil.LOG.info(s);
         throw new OutOfMemoryError(s);
       }
@@ -151,10 +144,10 @@ public class DataTransferTestUtil {
     }
 
     @Override
-    public void run(DataNode datanode) throws DiskOutOfSpaceException {
-      final Pipeline p = getPipelineTest().getPipeline();
-      if (p.contains(index, datanode.getDatanodeRegistration())) {
-        final String s = toString(datanode);
+    public void run(DatanodeID id) throws DiskOutOfSpaceException {
+      final Pipeline p = getPipelineTest().getPipeline(id);
+      if (p.contains(index, id)) {
+        final String s = toString(id);
         FiTestUtil.LOG.info(s);
         throw new DiskOutOfSpaceException(s);
       }
@@ -179,10 +172,10 @@ public class DataTransferTestUtil {
     }
 
     @Override
-    public void run(DataNode datanode) {
-      final Pipeline p = getPipelineTest().getPipeline();
-      if (p.contains(index, datanode.getDatanodeRegistration())) {
-        final String s = toString(datanode) + ", duration=" + duration;
+    public void run(DatanodeID id) {
+      final Pipeline p = getPipelineTest().getPipeline(id);
+      if (p.contains(index, id)) {
+        final String s = toString(id) + ", duration=" + duration;
         FiTestUtil.LOG.info(s);
         if (duration <= 0) {
           for(; true; FiTestUtil.sleep(1000)); //sleep forever

+ 52 - 0
src/test/aop/org/apache/hadoop/fi/Pipeline.java

@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.fi;
+
+import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
+import org.apache.hadoop.hdfs.protocol.LocatedBlock;
+import org.apache.hadoop.hdfs.protocol.DatanodeID;
+
+import java.util.List;
+import java.util.ArrayList;
+
+public class Pipeline {
+  private final List<String> datanodes = new ArrayList<String>();
+  private DatanodeInfo[] nodes;
+
+  Pipeline(LocatedBlock lb) {
+    for(DatanodeInfo d : lb.getLocations()) {
+      datanodes.add(d.getName());
+    }
+    nodes = lb.getLocations();
+  }
+
+  /** Does the pipeline contains d at the n th position? */
+  public boolean contains(int n, DatanodeID d) {
+    return d.getName().equals(datanodes.get(n));
+  }
+
+  /** Returns DatanodeInfo[] of the nodes of the constructed pipiline*/
+  public DatanodeInfo[] getDataNodes () {
+    return nodes;
+  }
+
+  /** {@inheritDoc} */
+  public String toString() {
+    return getClass().getSimpleName() + datanodes;
+  }
+}

+ 28 - 0
src/test/aop/org/apache/hadoop/fi/PipelineTest.java

@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.fi;
+
+import org.apache.hadoop.hdfs.protocol.DatanodeID;
+import org.apache.hadoop.hdfs.protocol.LocatedBlock;
+import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
+
+/** A pipeline contains a list of datanodes. */
+public interface PipelineTest {
+  public Pipeline initPipeline(LocatedBlock lb);
+  public Pipeline getPipeline(DatanodeID id);
+}

+ 6 - 3
src/test/aop/org/apache/hadoop/hdfs/protocol/ClientProtocolAspects.aj

@@ -20,6 +20,7 @@ package org.apache.hadoop.hdfs.protocol;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.fi.DataTransferTestUtil;
+import org.apache.hadoop.fi.PipelineTest;
 
 /** Aspect for ClientProtocol */
 public aspect ClientProtocolAspects {
@@ -29,7 +30,9 @@ public aspect ClientProtocolAspects {
     call(LocatedBlock ClientProtocol.addBlock(String, String));
 
   after() returning(LocatedBlock lb): addBlock() {
-    LOG.info("FI: addBlock "
-        + DataTransferTestUtil.getPipelineTest().initPipeline(lb));
+    PipelineTest pipelineTest = DataTransferTestUtil.getPipelineTest();
+    if (pipelineTest != null)
+      LOG.info("FI: addBlock "
+          + pipelineTest.initPipeline(lb));
   }
-}
+}

+ 5 - 2
src/test/aop/org/apache/hadoop/hdfs/server/datanode/BlockReceiverAspects.aj

@@ -22,6 +22,7 @@ import java.io.OutputStream;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.fi.DataTransferTestUtil.DataTransferTest;
 import org.apache.hadoop.fi.DataTransferTestUtil;
 import org.apache.hadoop.fi.ProbabilityModel;
 import org.apache.hadoop.util.DiskChecker.DiskOutOfSpaceException;
@@ -44,8 +45,10 @@ public aspect BlockReceiverAspects {
   before(BlockReceiver blockreceiver
       ) throws IOException : callReceivePacket(blockreceiver) {
     LOG.info("FI: callReceivePacket");
-    DataTransferTestUtil.getPipelineTest().fiCallReceivePacket.run(
-        blockreceiver.getDataNode());
+    DataTransferTest dtTest = DataTransferTestUtil.getDataTransferTest();
+    if (dtTest != null)
+      dtTest.fiCallReceivePacket.run(
+          blockreceiver.getDataNode().getDatanodeRegistration());
 
     if (ProbabilityModel.injectCriteria(BlockReceiver.class.getSimpleName())) {
       LOG.info("Before the injection point");

+ 7 - 3
src/test/aop/org/apache/hadoop/hdfs/server/datanode/DataTransferProtocolAspects.aj

@@ -24,6 +24,7 @@ import java.io.IOException;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.fi.DataTransferTestUtil;
+import org.apache.hadoop.fi.DataTransferTestUtil.DataTransferTest;
 import org.apache.hadoop.hdfs.protocol.DataTransferProtocol.Op;
 import org.apache.hadoop.hdfs.protocol.DataTransferProtocol.Receiver;
 import org.apache.hadoop.hdfs.protocol.DataTransferProtocol.Status;
@@ -59,7 +60,8 @@ public aspect DataTransferProtocolAspects {
     final DataNode d = dataxceiver.getDataNode();
     LOG.info("FI: statusRead " + status + ", datanode="
         + d.getDatanodeRegistration().getName());    
-    DataTransferTestUtil.getPipelineTest().fiStatusRead.run(d);
+    DataTransferTestUtil.getDataTransferTest().fiStatusRead.run(
+        d.getDatanodeRegistration());
   }
 
   pointcut receiverOpWriteBlock(DataXceiver dataxceiver):
@@ -68,7 +70,9 @@ public aspect DataTransferProtocolAspects {
   before(DataXceiver dataxceiver
       ) throws IOException: receiverOpWriteBlock(dataxceiver) {
     LOG.info("FI: receiverOpWriteBlock");
-    DataTransferTestUtil.getPipelineTest().fiReceiverOpWriteBlock.run(
-        dataxceiver.getDataNode());
+    DataTransferTest dtTest = DataTransferTestUtil.getDataTransferTest();
+    if (dtTest != null)
+      dtTest.fiReceiverOpWriteBlock.run(
+          dataxceiver.getDataNode().getDatanodeRegistration());
   }
 }

+ 56 - 9
src/test/aop/org/apache/hadoop/hdfs/server/datanode/TestFiDataTransferProtocol.java

@@ -24,6 +24,7 @@ import org.apache.hadoop.fi.DataTransferTestUtil;
 import org.apache.hadoop.fi.FiTestUtil;
 import org.apache.hadoop.fi.DataTransferTestUtil.DataTransferTest;
 import org.apache.hadoop.fi.DataTransferTestUtil.DoosAction;
+import org.apache.hadoop.fi.DataTransferTestUtil.OomAction;
 import org.apache.hadoop.fi.DataTransferTestUtil.SleepAction;
 import org.apache.hadoop.fi.FiTestUtil.Action;
 import org.apache.hadoop.fs.FSDataInputStream;
@@ -31,9 +32,12 @@ import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.hdfs.protocol.DatanodeID;
+import org.junit.Assert;
+import org.junit.Test;
 
 /** Test DataTransferProtocol with fault injection. */
-public class TestFiDataTransferProtocol extends junit.framework.TestCase {
+public class TestFiDataTransferProtocol {
   static final short REPLICATION = 3;
   static final long BLOCKSIZE = 1L * (1L << 20);
 
@@ -69,7 +73,7 @@ public class TestFiDataTransferProtocol extends junit.framework.TestCase {
       final FSDataInputStream in = dfs.open(p);
       final int b = in.read();
       in.close();
-      assertEquals(1, b);
+      Assert.assertEquals(1, b);
     }
     finally {
       cluster.shutdown();
@@ -79,7 +83,7 @@ public class TestFiDataTransferProtocol extends junit.framework.TestCase {
   private static void runSlowDatanodeTest(String methodName, SleepAction a
                   ) throws IOException {
     FiTestUtil.LOG.info("Running " + methodName + " ...");
-    final DataTransferTest t = DataTransferTestUtil.initTest();
+    final DataTransferTest t = (DataTransferTest)DataTransferTestUtil.initTest();
     t.fiCallReceivePacket.set(a);
     t.fiReceiverOpWriteBlock.set(a);
     t.fiStatusRead.set(a);
@@ -90,7 +94,8 @@ public class TestFiDataTransferProtocol extends junit.framework.TestCase {
    * Pipeline setup with DN0 very slow but it won't lead to timeout.
    * Client finishes setup successfully.
    */
-  public void testPipelineFi06() throws IOException {
+  @Test
+  public void pipeline_Fi_06() throws IOException {
     final String methodName = FiTestUtil.getMethodName();
     runSlowDatanodeTest(methodName, new SleepAction(methodName, 0, 3000));
   }
@@ -99,24 +104,54 @@ public class TestFiDataTransferProtocol extends junit.framework.TestCase {
    * Pipeline setup with DN1 very slow but it won't lead to timeout.
    * Client finishes setup successfully.
    */
-  public void testPipelineFi07() throws IOException {
+  @Test
+  public void pipeline_Fi_07() throws IOException {
     final String methodName = FiTestUtil.getMethodName();
     runSlowDatanodeTest(methodName, new SleepAction(methodName, 1, 3000));
   }
 
+  /**
+   * Pipeline setup with DN2 very slow but it won't lead to timeout.
+   * Client finishes setup successfully.
+   */
+  @Test
+  public void pipeline_Fi_08() throws IOException {
+    final String methodName = FiTestUtil.getMethodName();
+    runSlowDatanodeTest(methodName, new SleepAction(methodName, 2, 3000));
+  }
+
   private static void runCallReceivePacketTest(String methodName,
-      Action<DataNode> a) throws IOException {
+      Action<DatanodeID> a) throws IOException {
     FiTestUtil.LOG.info("Running " + methodName + " ...");
-    DataTransferTestUtil.initTest().fiCallReceivePacket.set(a);
+    ((DataTransferTest)DataTransferTestUtil.initTest()).fiCallReceivePacket.set(a);
+    write1byte(methodName);
+  }
+
+  private static void runStatusReadTest(String methodName, Action<DatanodeID> a
+      ) throws IOException {
+    FiTestUtil.LOG.info("Running " + methodName + " ...");
+    ((DataTransferTest)DataTransferTestUtil.initTest()).fiStatusRead.set(a);
     write1byte(methodName);
   }
 
+  /**
+   * Pipeline setup, DN1 throws an OutOfMemoryException right after it
+   * received a setup ack from DN2.
+   * Client gets an IOException and determine DN1 bad.
+   */
+  @Test
+  public void pipeline_Fi_12() throws IOException {
+    final String methodName = FiTestUtil.getMethodName();
+    runStatusReadTest(methodName, new OomAction(methodName, 1));
+  }
+
   /**
    * Streaming: Write a packet, DN0 throws a DiskOutOfSpaceError
    * when it writes the data to disk.
    * Client gets an IOException and determine DN0 bad.
    */
-  public void testPipelineFi14() throws IOException {
+  @Test
+  public void pipeline_Fi_14() throws IOException {
     final String methodName = FiTestUtil.getMethodName();
     runCallReceivePacketTest(methodName, new DoosAction(methodName, 0));
   }
@@ -126,8 +161,20 @@ public class TestFiDataTransferProtocol extends junit.framework.TestCase {
    * when it writes the data to disk.
    * Client gets an IOException and determine DN1 bad.
    */
-  public void testPipelineFi15() throws IOException {
+  @Test
+  public void pipeline_Fi_15() throws IOException {
     final String methodName = FiTestUtil.getMethodName();
     runCallReceivePacketTest(methodName, new DoosAction(methodName, 1));
   }
+
+  /**
+   * Streaming: Write a packet, DN2 throws a DiskOutOfSpaceError
+   * when it writes the data to disk.
+   * Client gets an IOException and determine DN2 bad.
+   */
+  @Test
+  public void pipeline_Fi_16() throws IOException {
+    final String methodName = FiTestUtil.getMethodName();
+    runCallReceivePacketTest(methodName, new DoosAction(methodName, 2));
+  }
 }

+ 4 - 0
src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java

@@ -655,4 +655,8 @@ public class SimulatedFSDataset  implements FSConstants, FSDatasetInterface, Con
   public String getStorageInfo() {
     return "Simulated FSDataset-" + storageId;
   }
+  
+  public boolean hasEnoughResource() {
+    return true;
+  }
 }

+ 369 - 0
src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/TestDataNodeVolumeFailure.java

@@ -0,0 +1,369 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.datanode;
+
+
+import java.io.File;
+import java.io.FilenameFilter;
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.net.Socket;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import junit.framework.TestCase;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.FileUtil;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.DFSClient;
+import org.apache.hadoop.hdfs.DFSTestUtil;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.hdfs.protocol.Block;
+import org.apache.hadoop.hdfs.protocol.BlockListAsLongs;
+import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
+import org.apache.hadoop.hdfs.protocol.LocatedBlock;
+import org.apache.hadoop.hdfs.server.common.HdfsConstants;
+import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
+import org.apache.hadoop.hdfs.server.namenode.NameNode;
+import org.apache.hadoop.net.NetUtils;
+import org.junit.After;
+import org.junit.Before;
+
+public class TestDataNodeVolumeFailure extends TestCase{
+  final private int block_size = 512;
+  MiniDFSCluster cluster = null;
+  int dn_num = 2;
+  int blocks_num = 30;
+  short repl=2;
+  File dataDir = null;
+  File data_fail = null;
+  File failedDir = null;
+  
+  // mapping blocks to Meta files(physical files) and locs(NameNode locations)
+  private class BlockLocs {
+    public int num_files = 0;
+    public int num_locs = 0;
+  }
+  // block id to BlockLocs
+  Map<String, BlockLocs> block_map = new HashMap<String, BlockLocs> ();
+
+  @Before
+  public void setUp() throws Exception {
+    
+    // bring up a cluster of 2
+    Configuration conf = new Configuration();
+    conf.setLong("dfs.block.size", block_size);
+    cluster = new MiniDFSCluster(conf, dn_num, true, null);
+    cluster.waitActive();
+  }
+  
+  
+  
+  public void testVolumeFailure() throws IOException {
+    FileSystem fs = cluster.getFileSystem();
+    dataDir = new File(cluster.getDataDirectory());
+    System.out.println("Data dir: is " +  dataDir.getPath());
+   
+    
+    // Data dir structure is dataDir/data[1-4]/[current,tmp...]
+    // data1,2 is for datanode 1, data2,3 - datanode2 
+    String filename = "/test.txt";
+    Path filePath = new Path(filename);
+    
+    // we use only small number of blocks to avoid creating subdirs in the data dir..
+    int filesize = block_size*blocks_num;
+    DFSTestUtil.createFile(fs, filePath, filesize, repl, 1L);
+    DFSTestUtil.waitReplication(fs, filePath, repl);
+    System.out.println("file " + filename + "(size " +
+        filesize + ") is created and replicated");
+   
+    // fail the volume
+    // delete/make non-writable one of the directories (failed volume)
+    data_fail = new File(dataDir, "data3");
+    failedDir = new File(data_fail, "current");
+    if (failedDir.exists() &&
+        //!FileUtil.fullyDelete(failedDir)
+        !deteteBlocks(failedDir)
+        ) {
+      throw new IOException("Could not delete hdfs directory '" + failedDir + "'");
+    }    
+    data_fail.setReadOnly();
+    failedDir.setReadOnly();
+    System.out.println("Deleteing " + failedDir.getPath() + "; exist=" + failedDir.exists());
+    
+    // access all the blocks on the "failed" DataNode, 
+    // we need to make sure that the "failed" volume is being accessed - 
+    // and that will cause failure, blocks removal, "emergency" block report
+    triggerFailure(filename, filesize);
+    
+    // make sure a block report is sent 
+    DataNode dn = cluster.getDataNodes().get(1); //corresponds to dir data3
+    cluster.getNameNode().blockReport(dn.dnRegistration,
+        BlockListAsLongs.convertToArrayLongs(cluster.getBlockReport(1)));
+
+    // verify number of blocks and files...
+    verify(filename, filesize);
+    
+    // create another file (with one volume failed).
+    System.out.println("creating file test1.txt");
+    Path fileName1 = new Path("/test1.txt");
+    DFSTestUtil.createFile(fs, fileName1, filesize, repl, 1L);
+    
+    
+    // should be able to replicate to both nodes (2 DN, repl=2)
+    DFSTestUtil.waitReplication(fs, fileName1, repl);
+    System.out.println("file " + fileName1.getName() + 
+        " is created and replicated");
+    
+  }
+  
+  /**
+   * verifies two things:
+   *  1. number of locations of each block in the name node
+   *   matches number of actual files
+   *  2. block files + pending block equals to total number of blocks that a file has 
+   *     including the replication (HDFS file has 30 blocks, repl=2 - total 60
+   * @param fn - file name
+   * @param fs - file size
+   * @throws IOException
+   */
+  private void verify(String fn, int fs) throws IOException{
+    // now count how many physical blocks are there
+    int totalReal = countRealBlocks(block_map);
+    System.out.println("countRealBlocks counted " + totalReal + " blocks");
+
+    // count how many blocks store in NN structures.
+    int totalNN = countNNBlocks(block_map, fn, fs);
+    System.out.println("countNNBlocks counted " + totalNN + " blocks");
+
+    for(String bid : block_map.keySet()) {
+      BlockLocs bl = block_map.get(bid);
+      // System.out.println(bid + "->" + bl.num_files + "vs." + bl.num_locs);
+      // number of physical files (1 or 2) should be same as number of datanodes
+      // in the list of the block locations
+      assertEquals(bl.num_files, bl.num_locs);
+    }
+    // verify we have the same number of physical blocks and stored in NN
+    assertEquals(totalReal, totalNN);
+
+    // now check the number of under-replicated blocks
+    FSNamesystem fsn = cluster.getNamesystem();
+    // force update of all the metric counts by calling computeDatanodeWork
+    fsn.computeDatanodeWork();
+    // get all the counts 
+    long underRepl = fsn.getUnderReplicatedBlocks();
+    long pendRepl = fsn.getPendingReplicationBlocks();
+    long totalRepl = underRepl + pendRepl;
+    System.out.println("underreplicated after = "+ underRepl + 
+        " and pending repl ="  + pendRepl + "; total underRepl = " + totalRepl);
+
+    System.out.println("total blocks (real and replicating):" + 
+        (totalReal + totalRepl) + " vs. all files blocks " + blocks_num*2);
+
+    // together all the blocks should be equal to all real + all underreplicated
+    assertEquals(totalReal + totalRepl, blocks_num*repl);
+  }
+  
+  /**
+   * go to each block on the 2nd DataNode until it fails...
+   * @param path
+   * @param size
+   * @throws IOException
+   */
+  private void triggerFailure(String path, long size) throws IOException {
+    NameNode nn = cluster.getNameNode();
+    List<LocatedBlock> locatedBlocks = nn.getBlockLocations(path, 0, size).getLocatedBlocks();
+//    System.out.println("Number of blocks: " + locatedBlocks.size()); 
+    
+    for(LocatedBlock lb : locatedBlocks) {
+      DatanodeInfo dinfo = lb.getLocations()[1];
+      Block b = lb.getBlock();
+    //  System.out.println(i++ + ". " + b.getBlockName());
+      try {
+        accessBlock(dinfo, lb);
+      } catch (IOException e) {
+        System.out.println("Failure triggered, on block: " + b.getBlockId() +  
+            "; corresponding volume should be removed by now");
+        break;
+      }
+    }
+  }
+  
+  /**
+   * simulate failure delete all the block files
+   * @param dir
+   * @throws IOException
+   */
+  private boolean deteteBlocks(File dir) {
+    
+    File [] fileList = dir.listFiles();
+    for(File f : fileList) {
+      if(f.getName().startsWith("blk_")) {
+        if(!f.delete())
+          return false;
+        
+      }
+    }
+    
+    return true;
+  }
+  
+  /**
+   * try to access a block on a data node. If fails - throws exception
+   * @param datanode
+   * @param lblock
+   * @throws IOException
+   */
+  private void accessBlock(DatanodeInfo datanode, LocatedBlock lblock)
+    throws IOException {
+    InetSocketAddress targetAddr = null;
+    Socket s = null;
+    DFSClient.BlockReader blockReader = null; 
+    Block block = lblock.getBlock(); 
+   
+    targetAddr = NetUtils.createSocketAddr(datanode.getName());
+      
+    s = new Socket();
+    s.connect(targetAddr, HdfsConstants.READ_TIMEOUT);
+    s.setSoTimeout(HdfsConstants.READ_TIMEOUT);
+
+    blockReader = 
+      DFSClient.BlockReader.newBlockReader(s, targetAddr.toString() + ":" + 
+          block.getBlockId(), 
+          block.getBlockId(), 
+          lblock.getAccessToken(),
+          block.getGenerationStamp(), 
+          0, -1, 4096);
+
+    // nothing - if it fails - it will throw and exception
+  }
+  
+  /**
+   * Count datanodes that have copies of the blocks for a file
+   * put it into the map
+   * @param map
+   * @param path
+   * @param size
+   * @return
+   * @throws IOException
+   */
+  private int countNNBlocks(Map<String, BlockLocs> map, String path, long size) 
+    throws IOException {
+    int total = 0;
+    
+    NameNode nn = cluster.getNameNode();
+    List<LocatedBlock> locatedBlocks = 
+      nn.getBlockLocations(path, 0, size).getLocatedBlocks();
+    //System.out.println("Number of blocks: " + locatedBlocks.size()); 
+        
+    for(LocatedBlock lb : locatedBlocks) {
+      String blockId = ""+lb.getBlock().getBlockId();
+      //System.out.print(blockId + ": ");
+      DatanodeInfo[] dn_locs = lb.getLocations();
+      BlockLocs bl = map.get(blockId);
+      if(bl == null) {
+        bl = new BlockLocs();
+      }
+      //System.out.print(dn_info.name+",");
+      total += dn_locs.length;        
+      bl.num_locs += dn_locs.length;
+      map.put(blockId, bl);
+      //System.out.println();
+    }
+    return total;
+  }
+  
+  /**
+   *  look for real blocks
+   *  by counting *.meta files in all the storage dirs 
+   * @param map
+   * @return
+   */
+
+  private int countRealBlocks(Map<String, BlockLocs> map) {
+    int total = 0;
+    for(int i=0; i<dn_num; i++) {
+      for(int j=1; j<=2; j++) {
+        File dir = new File(new File(dataDir, "data"+(2*i+j)), "current");
+        if(dir == null) {
+          System.out.println("dir is null for dn=" + i + " and data_dir=" + j);
+          continue;
+        }
+      
+        String [] res = metaFilesInDir(dir);
+        if(res == null) {
+          System.out.println("res is null for dir = " + dir + " i=" + i + " and j=" + j);
+          continue;
+        }
+        //System.out.println("for dn" + i + "." + j + ": " + dir + "=" + res.length+ " files");
+      
+        //int ii = 0;
+        for(String s: res) {
+          // cut off "blk_-" at the beginning and ".meta" at the end
+          assertNotNull(s);
+          String bid = s.substring(s.indexOf("_")+1, s.lastIndexOf("_"));
+          //System.out.println(ii++ + ". block " + s + "; id=" + bid);
+          BlockLocs val = map.get(bid);
+          if(val == null) {
+            val = new BlockLocs();
+          }
+          val.num_files ++; // one more file for the block
+          map.put(bid, val);
+
+        }
+        //System.out.println("dir1="+dir.getPath() + "blocks=" + res.length);
+        //System.out.println("dir2="+dir2.getPath() + "blocks=" + res2.length);
+
+        total += res.length;
+      }
+    }
+    return total;
+  }
+
+  /*
+   * count how many files *.meta are in the dir
+   */
+  private String [] metaFilesInDir(File dir) {
+    String [] res = dir.list(
+        new FilenameFilter() {
+          public boolean accept(File dir, String name) {
+            return name.startsWith("blk_") &&
+            name.endsWith(FSDataset.METADATA_EXTENSION);
+          }
+        }
+    );
+    return res;
+  }
+
+  @After
+  public void tearDown() throws Exception {
+    if(data_fail != null) {
+      data_fail.setWritable(true);
+    }
+    if(failedDir != null) {
+      failedDir.setWritable(true);
+    }
+    if(cluster != null) {
+      cluster.shutdown();
+    }
+  }
+
+}

+ 1 - 0
src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/TestFsck.java

@@ -162,6 +162,7 @@ public class TestFsck extends TestCase {
     try {
       Configuration conf = new Configuration();
       conf.setLong("dfs.blockreport.intervalMsec", 10000L);
+      conf.setInt("dfs.datanode.directoryscan.interval", 1);
       cluster = new MiniDFSCluster(conf, 4, true, null);
       String topDir = "/srcdat";
       fs = cluster.getFileSystem();