Browse Source

Merging change r1034932 from trunk into federation

git-svn-id: https://svn.apache.org/repos/asf/hadoop/hdfs/branches/HDFS-1052@1078113 13f79535-47bb-0310-9956-ffa450edef68
Suresh Srinivas 14 years ago
parent
commit
3b318d46d0

+ 3 - 0
CHANGES.txt

@@ -289,6 +289,9 @@ Release 0.22.0 - Unreleased
 
     HDFS-1164. TestHdfsProxy is failing. (Todd Lipcon via cos)
 
+    HDFS-811. Add metrics, failure reporting and additional tests for HDFS-457.
+    (eli)
+
   IMPROVEMENTS
 
     HDFS-1304. Add a new unit test for HftpFileSystem.open(..).  (szetszwo)

+ 2 - 0
src/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java

@@ -1683,6 +1683,8 @@ public class DataNode extends Configured
     // shutdown the DN completely.
     int dpError = hasEnoughResources ? DatanodeProtocol.DISK_ERROR  
                                      : DatanodeProtocol.FATAL_DISK_ERROR;  
+    myMetrics.volumesFailed.inc(1);
+
     //inform NameNodes
     for(BPOfferService bpos: blockPoolManager.getAllNamenodeThreads()) {
       DatanodeProtocol nn = bpos.bpNamenode;

+ 3 - 0
src/java/org/apache/hadoop/hdfs/server/datanode/metrics/DataNodeMetrics.java

@@ -76,6 +76,9 @@ public class DataNodeMetrics implements Updater {
               new MetricsTimeVaryingInt("writes_from_local_client", registry);
   public MetricsTimeVaryingInt writesFromRemoteClient = 
               new MetricsTimeVaryingInt("writes_from_remote_client", registry);
+
+  public MetricsTimeVaryingInt volumesFailed =
+    new MetricsTimeVaryingInt("volumes_failed", registry);
   
   public MetricsTimeVaryingRate readBlockOp = 
                 new MetricsTimeVaryingRate("readBlockOp", registry);

+ 23 - 2
src/java/org/apache/hadoop/hdfs/server/namenode/DatanodeDescriptor.java

@@ -120,7 +120,6 @@ public class DatanodeDescriptor extends DatanodeInfo {
   private long lastBlocksScheduledRollTime = 0;
   private static final int BLOCKS_SCHEDULED_ROLL_INTERVAL = 600*1000; //10min
   private int volumeFailures = 0;
-  
   /** 
    * When set to true, the node is not in include list and is not allowed
    * to communicate with the namenode
@@ -658,7 +657,13 @@ public class DatanodeDescriptor extends DatanodeInfo {
       return startTime;
     }
   }  // End of class DecommissioningStatus
-  
+
+  /**
+   * Increment the volume failure count.
+   */
+  public void incVolumeFailure() {
+    volumeFailures++;
+  }
   
   /**
    * Set the flag to indicate if this datanode is disallowed from communicating
@@ -671,4 +676,20 @@ public class DatanodeDescriptor extends DatanodeInfo {
   boolean isDisallowed() {
     return disallowed;
   }
+
+  /**
+   * @return number of failed volumes in the datanode.
+   */
+  public int getVolumeFailures() {
+    return volumeFailures;
+  }
+
+  /**
+   * Reset the volume failure count when a DN re-registers.
+   * @param nodeReg DatanodeID to update registration for.
+   */
+  public void updateRegInfo(DatanodeID nodeReg) {
+    super.updateRegInfo(nodeReg);
+    volumeFailures = 0;
+  }
 }

+ 24 - 5
src/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java

@@ -2906,6 +2906,7 @@ public class FSNamesystem implements FSConstants, FSNamesystemMBean, FSClusterSt
    * The datanode will be informed of this work at the next heartbeat.
    * 
    * @return number of blocks scheduled for replication or removal.
+   * @throws IOException
    */
   public int computeDatanodeWork() throws IOException {
     int workFound = 0;
@@ -2940,8 +2941,25 @@ public class FSNamesystem implements FSConstants, FSNamesystemMBean, FSClusterSt
   }
 
   /**
-   * remove a datanode descriptor
-   * @param nodeID datanode ID
+   * Update the descriptor for the datanode to reflect a volume failure.
+   * @param nodeID DatanodeID to update count for.
+   * @throws IOException
+   */
+  synchronized public void incVolumeFailure(DatanodeID nodeID)
+    throws IOException {
+    DatanodeDescriptor nodeInfo = getDatanode(nodeID);
+    if (nodeInfo != null) {
+      nodeInfo.incVolumeFailure();
+    } else {
+      NameNode.stateChangeLog.warn("BLOCK* NameSystem.incVolumeFailure: "
+                                   + nodeID.getName() + " does not exist");
+    }
+  }
+
+  /**
+   * Remove a datanode descriptor.
+   * @param nodeID datanode ID.
+   * @throws IOException
    */
   public void removeDatanode(DatanodeID nodeID) 
     throws IOException {
@@ -2960,8 +2978,8 @@ public class FSNamesystem implements FSConstants, FSNamesystemMBean, FSClusterSt
   }
   
   /**
-   * remove a datanode descriptor
-   * @param nodeInfo datanode descriptor
+   * Remove a datanode descriptor.
+   * @param nodeInfo datanode descriptor.
    */
   private void removeDatanode(DatanodeDescriptor nodeInfo) {
     synchronized (heartbeats) {
@@ -3008,8 +3026,9 @@ public class FSNamesystem implements FSConstants, FSNamesystemMBean, FSClusterSt
 
   /**
    * Physically remove node from datanodeMap.
-   * 
+   *
    * @param nodeID node
+   * @throws IOException
    */
   void wipeDatanode(DatanodeID nodeID) throws IOException {
     String key = nodeID.getStorageID();

+ 1 - 0
src/java/org/apache/hadoop/hdfs/server/namenode/NameNode.java

@@ -1294,6 +1294,7 @@ public class NameNode implements NamenodeProtocols, FSConstants {
       return;
     }
     verifyRequest(nodeReg);
+    namesystem.incVolumeFailure(nodeReg);
     if (errorCode == DatanodeProtocol.DISK_ERROR) {
       LOG.warn("Volume failed on " + dnName); 
     } else if (errorCode == DatanodeProtocol.FATAL_DISK_ERROR) {

+ 36 - 35
src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/TestDataNodeVolumeFailure.java

@@ -17,7 +17,6 @@
  */
 package org.apache.hadoop.hdfs.server.datanode;
 
-
 import java.io.File;
 import java.io.FilenameFilter;
 import java.io.IOException;
@@ -27,8 +26,6 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 
-import junit.framework.TestCase;
-
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
@@ -45,10 +42,16 @@ import org.apache.hadoop.hdfs.server.namenode.NameNode;
 import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
 import org.apache.hadoop.hdfs.DFSConfigKeys;
 import org.apache.hadoop.net.NetUtils;
+
 import org.junit.After;
 import org.junit.Before;
+import org.junit.Test;
+import static org.junit.Assert.*;
 
-public class TestDataNodeVolumeFailure extends TestCase{
+/**
+ * Fine-grain testing of block files and locations after volume failure.
+ */
+public class TestDataNodeVolumeFailure {
   final private int block_size = 512;
   MiniDFSCluster cluster = null;
   int dn_num = 2;
@@ -68,7 +71,6 @@ public class TestDataNodeVolumeFailure extends TestCase{
 
   @Before
   public void setUp() throws Exception {
-    
     // bring up a cluster of 2
     Configuration conf = new HdfsConfiguration();
     conf.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, block_size);
@@ -77,9 +79,26 @@ public class TestDataNodeVolumeFailure extends TestCase{
     cluster = new MiniDFSCluster.Builder(conf).numDataNodes(dn_num).build();
     cluster.waitActive();
   }
+
+  @After
+  public void tearDown() throws Exception {
+    if(data_fail != null) {
+      data_fail.setWritable(true);
+    }
+    if(failedDir != null) {
+      failedDir.setWritable(true);
+    }
+    if(cluster != null) {
+      cluster.shutdown();
+    }
+  }
   
-  
-  
+  /*
+   * Verify the number of blocks and files are correct after volume failure,
+   * and that we can replicate to both datanodes even after a single volume
+   * failure if the configuration parameter allows this.
+   */
+  @Test
   public void testVolumeFailure() throws IOException {
     FileSystem fs = cluster.getFileSystem();
     dataDir = new File(cluster.getDataDirectory());
@@ -133,12 +152,10 @@ public class TestDataNodeVolumeFailure extends TestCase{
     Path fileName1 = new Path("/test1.txt");
     DFSTestUtil.createFile(fs, fileName1, filesize, repl, 1L);
     
-    
     // should be able to replicate to both nodes (2 DN, repl=2)
     DFSTestUtil.waitReplication(fs, fileName1, repl);
     System.out.println("file " + fileName1.getName() + 
         " is created and replicated");
-    
   }
   
   /**
@@ -165,10 +182,11 @@ public class TestDataNodeVolumeFailure extends TestCase{
       // System.out.println(bid + "->" + bl.num_files + "vs." + bl.num_locs);
       // number of physical files (1 or 2) should be same as number of datanodes
       // in the list of the block locations
-      assertEquals(bl.num_files, bl.num_locs);
+      assertEquals("Num files should match num locations",
+          bl.num_files, bl.num_locs);
     }
-    // verify we have the same number of physical blocks and stored in NN
-    assertEquals(totalReal, totalNN);
+    assertEquals("Num physical blocks should match num stored in the NN",
+        totalReal, totalNN);
 
     // now check the number of under-replicated blocks
     FSNamesystem fsn = cluster.getNamesystem();
@@ -185,7 +203,8 @@ public class TestDataNodeVolumeFailure extends TestCase{
         (totalReal + totalRepl) + " vs. all files blocks " + blocks_num*2);
 
     // together all the blocks should be equal to all real + all underreplicated
-    assertEquals(totalReal + totalRepl, blocks_num*repl);
+    assertEquals("Incorrect total block count",
+        totalReal + totalRepl, blocks_num * repl);
   }
   
   /**
@@ -196,13 +215,12 @@ public class TestDataNodeVolumeFailure extends TestCase{
    */
   private void triggerFailure(String path, long size) throws IOException {
     NameNode nn = cluster.getNameNode();
-    List<LocatedBlock> locatedBlocks = nn.getBlockLocations(path, 0, size).getLocatedBlocks();
-//    System.out.println("Number of blocks: " + locatedBlocks.size()); 
+    List<LocatedBlock> locatedBlocks =
+      nn.getBlockLocations(path, 0, size).getLocatedBlocks();
     
-    for(LocatedBlock lb : locatedBlocks) {
+    for (LocatedBlock lb : locatedBlocks) {
       DatanodeInfo dinfo = lb.getLocations()[1];
       ExtendedBlock b = lb.getBlock();
-    //  System.out.println(i++ + ". " + b.getBlockName());
       try {
         accessBlock(dinfo, lb);
       } catch (IOException e) {
@@ -219,7 +237,6 @@ public class TestDataNodeVolumeFailure extends TestCase{
    * @throws IOException
    */
   private boolean deteteBlocks(File dir) {
-    
     File [] fileList = dir.listFiles();
     for(File f : fileList) {
       if(f.getName().startsWith("blk_")) {
@@ -228,7 +245,6 @@ public class TestDataNodeVolumeFailure extends TestCase{
         
       }
     }
-    
     return true;
   }
   
@@ -301,7 +317,6 @@ public class TestDataNodeVolumeFailure extends TestCase{
    * @param map
    * @return
    */
-
   private int countRealBlocks(Map<String, BlockLocs> map) {
     int total = 0;
     final String bpid = cluster.getNamesystem().getBlockPoolId();
@@ -324,7 +339,7 @@ public class TestDataNodeVolumeFailure extends TestCase{
         //int ii = 0;
         for(String s: res) {
           // cut off "blk_-" at the beginning and ".meta" at the end
-          assertNotNull(s);
+          assertNotNull("Block file name should not be null", s);
           String bid = s.substring(s.indexOf("_")+1, s.lastIndexOf("_"));
           //System.out.println(ii++ + ". block " + s + "; id=" + bid);
           BlockLocs val = map.get(bid);
@@ -358,18 +373,4 @@ public class TestDataNodeVolumeFailure extends TestCase{
     );
     return res;
   }
-
-  @After
-  public void tearDown() throws Exception {
-    if(data_fail != null) {
-      data_fail.setWritable(true);
-    }
-    if(failedDir != null) {
-      failedDir.setWritable(true);
-    }
-    if(cluster != null) {
-      cluster.shutdown();
-    }
-  }
-
 }

+ 377 - 0
src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/TestDataNodeVolumeFailureReporting.java

@@ -0,0 +1,377 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.datanode;
+
+import java.io.File;
+import java.util.ArrayList;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.DFSTestUtil;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.hdfs.server.datanode.metrics.DataNodeMetrics;
+import org.apache.hadoop.hdfs.server.namenode.DatanodeDescriptor;
+import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
+import org.apache.hadoop.hdfs.HdfsConfiguration;
+import org.apache.hadoop.hdfs.DFSConfigKeys;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.commons.logging.impl.Log4JLogger;
+import org.apache.log4j.Level;
+
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import static org.junit.Assert.*;
+
+/**
+ * Test successive volume failures, failure metrics and capacity reporting.
+ */
+public class TestDataNodeVolumeFailureReporting {
+
+  private static final Log LOG = LogFactory.getLog(TestDataNodeVolumeFailureReporting.class);
+  {
+    ((Log4JLogger)TestDataNodeVolumeFailureReporting.LOG).getLogger().setLevel(Level.ALL);
+  }
+
+  private FileSystem fs;
+  private MiniDFSCluster cluster;
+  private Configuration conf;
+  private String dataDir;
+
+  @Before
+  public void setUp() throws Exception {
+    conf = new HdfsConfiguration();
+    conf.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, 512L);
+    /*
+     * Lower the DN heartbeat, DF rate, and recheck interval to one second
+     * so state about failures and datanode death propagates faster.
+     */
+    conf.setInt(DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_KEY, 1);
+    conf.setInt(DFSConfigKeys.DFS_DF_INTERVAL_KEY, 1000);
+    conf.setInt(DFSConfigKeys.DFS_NAMENODE_HEARTBEAT_RECHECK_INTERVAL_KEY, 1000);
+    // Allow a single volume failure (there are two volumes)
+    conf.setInt(DFSConfigKeys.DFS_DATANODE_FAILED_VOLUMES_TOLERATED_KEY, 1);
+    cluster = new MiniDFSCluster.Builder(conf).numDataNodes(1).build();
+    cluster.waitActive();
+    fs = cluster.getFileSystem();
+    dataDir = cluster.getDataDirectory();
+  }
+
+  @After
+  public void tearDown() throws Exception {
+    cluster.shutdown();
+  }
+
+  /**
+   * Test that individual volume failures do not cause DNs to fail, that
+   * all volumes failed on a single datanode do cause it to fail, and
+   * that the capacities and liveliness is adjusted correctly in the NN.
+   */
+  @Test
+  public void testSuccessiveVolumeFailures() throws Exception {
+    if (System.getProperty("os.name").startsWith("Windows")) {
+      // See above
+      return;
+    }
+    // Bring up two more datanodes
+    cluster.startDataNodes(conf, 2, true, null, null);
+    cluster.waitActive();
+
+    /*
+     * Sleep at least 3 seconds (a 1s heartbeat plus padding) to allow
+     * for heartbeats to propagate from the datanodes to the namenode.
+     * Sleep  at least (2 * re-check + 10 * heartbeat) 12 seconds for
+     * a datanode  to be called dead by the namenode.
+     */
+    final int WAIT_FOR_HEARTBEATS = 3000;
+    final int WAIT_FOR_DEATH = 15000;
+
+    /*
+     * Calculate the total capacity of all the datanodes. Sleep for
+     * three seconds to be sure the datanodes have had a chance to
+     * heartbeat their capacities.
+     */
+    Thread.sleep(WAIT_FOR_HEARTBEATS);
+    FSNamesystem namesystem = cluster.getNamesystem();
+    ArrayList<DatanodeDescriptor> live = new ArrayList<DatanodeDescriptor>();
+    ArrayList<DatanodeDescriptor> dead = new ArrayList<DatanodeDescriptor>();
+    namesystem.DFSNodesStatus(live, dead);
+    assertEquals("All DNs should be live", 3, live.size());
+    assertEquals("All DNs should be live", 0, dead.size());
+    long origCapacity = 0;
+    for (final DatanodeDescriptor dn : live) {
+      origCapacity += dn.getCapacity();
+      assertEquals("DN "+dn+" vols should be healthy",
+          0, dn.getVolumeFailures());
+    }
+
+    File dn1Vol1 = new File(dataDir, "data"+(2*0+1));
+    File dn2Vol1 = new File(dataDir, "data"+(2*1+1));
+    File dn3Vol1 = new File(dataDir, "data"+(2*2+1));
+    File dn3Vol2 = new File(dataDir, "data"+(2*2+2));
+
+    /*
+     * Make the 1st volume directories on the first two datanodes
+     * non-accessible.  We don't make all three 1st volume directories
+     * readonly since that would cause the entire pipeline to
+     * fail. The client does not retry failed nodes even though
+     * perhaps they could succeed because just a single volume failed.
+     */
+    assertTrue("Couldn't chmod local vol", dn1Vol1.setExecutable(false));
+    assertTrue("Couldn't chmod local vol", dn2Vol1.setExecutable(false));
+
+    /*
+     * Create file1 and wait for 3 replicas (ie all DNs can still
+     * store a block).  Then assert that all DNs are up, despite the
+     * volume failures.
+     */
+    Path file1 = new Path("/test1");
+    DFSTestUtil.createFile(fs, file1, 1024, (short)3, 1L);
+    DFSTestUtil.waitReplication(fs, file1, (short)3);
+    ArrayList<DataNode> dns = cluster.getDataNodes();
+    assertTrue("DN1 should be up", dns.get(0).isDatanodeUp());
+    assertTrue("DN2 should be up", dns.get(1).isDatanodeUp());
+    assertTrue("DN3 should be up", dns.get(2).isDatanodeUp());
+
+    /*
+     * The metrics should confirm the volume failures.
+     */
+    DataNodeMetrics metrics1 = dns.get(0).getMetrics();
+    DataNodeMetrics metrics2 = dns.get(1).getMetrics();
+    DataNodeMetrics metrics3 = dns.get(2).getMetrics();
+    assertEquals("Vol1 should report 1 failure",
+        1, metrics1.volumesFailed.getCurrentIntervalValue());
+    assertEquals("Vol2 should report 1 failure",
+        1, metrics2.volumesFailed.getCurrentIntervalValue());
+    assertEquals("Vol3 should have no failures",
+        0, metrics3.volumesFailed.getCurrentIntervalValue());
+
+    // Eventually the NN should report two volume failures as well
+    while (true) {
+      Thread.sleep(WAIT_FOR_HEARTBEATS);
+      live.clear();
+      dead.clear();
+      namesystem.DFSNodesStatus(live, dead);
+      int volumeFailures = 0;
+      for (final DatanodeDescriptor dn : live) {
+        volumeFailures += dn.getVolumeFailures();
+      }
+      if (2 == volumeFailures) {
+        break;
+      }
+      LOG.warn("Still waiting for volume failures: "+volumeFailures);
+    }
+
+    /*
+     * Now fail a volume on the third datanode. We should be able to get
+     * three replicas since we've already identified the other failures.
+     */
+    assertTrue("Couldn't chmod local vol", dn3Vol1.setExecutable(false));
+    Path file2 = new Path("/test2");
+    DFSTestUtil.createFile(fs, file2, 1024, (short)3, 1L);
+    DFSTestUtil.waitReplication(fs, file2, (short)3);
+    assertTrue("DN3 should still be up", dns.get(2).isDatanodeUp());
+    assertEquals("Vol3 should report 1 failure",
+        1, metrics3.volumesFailed.getCurrentIntervalValue());
+    live.clear();
+    dead.clear();
+    namesystem.DFSNodesStatus(live, dead);
+    assertEquals("DN3 should have 1 failed volume",
+        1, live.get(2).getVolumeFailures());
+
+    /*
+     * Once the datanodes have a chance to heartbeat their new capacity the
+     * total capacity should be down by three volumes (assuming the host
+     * did not grow or shrink the data volume while the test was running).
+     */
+    while (true) {
+      Thread.sleep(WAIT_FOR_HEARTBEATS);
+      live.clear();
+      dead.clear();
+      namesystem.DFSNodesStatus(live, dead);
+      long currCapacity = 0;
+      long singleVolCapacity = live.get(0).getCapacity();
+      for (final DatanodeDescriptor dn : live) {
+        currCapacity += dn.getCapacity();
+      }
+      LOG.info("Live: "+live.size()+" Dead: "+dead.size());
+      LOG.info("Original capacity: "+origCapacity);
+      LOG.info("Current capacity: "+currCapacity);
+      LOG.info("Volume capacity: "+singleVolCapacity);
+      if (3 == live.size() && 0 == dead.size() &&
+          origCapacity == (currCapacity + (3 * singleVolCapacity))) {
+        break;
+      }
+    }
+
+    /*
+     * Now fail the 2nd volume on the 3rd datanode. All its volumes
+     * are now failed and so it should report two volume failures
+     * and that it's no longer up. Only wait for two replicas since
+     * we'll never get a third.
+     */
+    assertTrue("Couldn't chmod local vol", dn3Vol2.setExecutable(false));
+    Path file3 = new Path("/test3");
+    DFSTestUtil.createFile(fs, file3, 1024, (short)3, 1L);
+    DFSTestUtil.waitReplication(fs, file3, (short)2);
+    // Eventually the DN should go down
+    while (dns.get(2).isDatanodeUp()) {
+      Thread.sleep(1000);
+    }
+    // and report two failed volumes
+    metrics3 = dns.get(2).getMetrics();
+    assertEquals("DN3 should report 2 vol failures",
+        2, metrics3.volumesFailed.getCurrentIntervalValue());
+    // and eventually be seen as dead by the NN.
+    while (true) {
+      Thread.sleep(WAIT_FOR_DEATH);
+      live.clear();
+      dead.clear();
+      namesystem.DFSNodesStatus(live, dead);
+      if (1 == dead.size() && 2 == live.size()) {
+        break;
+      }
+      LOG.warn("Still waiting for dn to die: "+dead.size());
+    }
+
+    /*
+     * The datanode never tries to restore the failed volume, even if
+     * it's subsequently repaired, but it should see this volume on
+     * restart, so file creation should be able to succeed after
+     * restoring the data directories and restarting the datanodes.
+     */
+    assertTrue("Couldn't chmod local vol", dn1Vol1.setExecutable(true));
+    assertTrue("Couldn't chmod local vol", dn2Vol1.setExecutable(true));
+    assertTrue("Couldn't chmod local vol", dn3Vol1.setExecutable(true));
+    assertTrue("Couldn't chmod local vol", dn3Vol2.setExecutable(true));
+    cluster.restartDataNodes();
+    cluster.waitActive();
+    Path file4 = new Path("/test4");
+    DFSTestUtil.createFile(fs, file4, 1024, (short)3, 1L);
+    DFSTestUtil.waitReplication(fs, file4, (short)3);
+
+    /*
+     * Eventually the capacity should be restored to its original value,
+     * and that the volume failure count should be reported as zero by
+     * both the metrics and the NN.
+     */
+    while (true) {
+      Thread.sleep(WAIT_FOR_DEATH);
+      live.clear();
+      dead.clear();
+      namesystem.DFSNodesStatus(live, dead);
+      assertEquals("All DNs should be live", 3, live.size());
+      assertEquals("All DNs should be live", 0, dead.size());
+      long currCapacity = 0;
+      long volFailures = 0;
+      for (final DatanodeDescriptor dn : live) {
+        currCapacity += dn.getCapacity();
+        volFailures += dn.getVolumeFailures();
+      }
+      if (3 == live.size() && 0 == dead.size() && 0 == volFailures &&
+          origCapacity == currCapacity) {
+        break;
+      }
+      LOG.warn("Waiting for capacity: original="+origCapacity+" current="+
+          currCapacity+" live="+live.size()+" dead="+dead.size()+
+          " vols="+volFailures);
+    }
+  }
+
+  /**
+   * Test the DFS_DATANODE_FAILED_VOLUMES_TOLERATED_KEY configuration
+   * option, ie the DN shuts itself down when the number of failures
+   * experienced drops below the tolerated amount.
+   */
+  @Test
+  public void testConfigureMinValidVolumes() throws Exception {
+    if (System.getProperty("os.name").startsWith("Windows")) {
+      // See above
+      return;
+    }
+
+    // Bring up two additional datanodes that need both of their volumes
+    // functioning in order to stay up.
+    conf.setInt(DFSConfigKeys.DFS_DATANODE_FAILED_VOLUMES_TOLERATED_KEY, 0);
+    cluster.startDataNodes(conf, 2, true, null, null);
+    cluster.waitActive();
+
+    // Fail a volume on the 2nd DN
+    File dn2Vol1 = new File(dataDir, "data"+(2*1+1));
+    assertTrue("Couldn't chmod local vol", dn2Vol1.setExecutable(false));
+
+    // Should only get two replicas (the first DN and the 3rd)
+    Path file1 = new Path("/test1");
+    DFSTestUtil.createFile(fs, file1, 1024, (short)3, 1L);
+    DFSTestUtil.waitReplication(fs, file1, (short)2);
+
+    // Check that this single failure caused a DN to die.
+    while (true) {
+      final int WAIT_FOR_DEATH = 15000;
+      Thread.sleep(WAIT_FOR_DEATH);
+      FSNamesystem namesystem = cluster.getNamesystem();
+      ArrayList<DatanodeDescriptor> live = new ArrayList<DatanodeDescriptor>();
+      ArrayList<DatanodeDescriptor> dead = new ArrayList<DatanodeDescriptor>();
+      namesystem.DFSNodesStatus(live, dead);
+      if (1 == dead.size()) {
+        break;
+      }
+      LOG.warn("Waiting for datanode to die: "+dead.size());
+    }
+
+    // If we restore the volume we should still only be able to get
+    // two replicas since the DN is still considered dead.
+    assertTrue("Couldn't chmod local vol", dn2Vol1.setExecutable(true));
+    Path file2 = new Path("/test2");
+    DFSTestUtil.createFile(fs, file2, 1024, (short)3, 1L);
+    DFSTestUtil.waitReplication(fs, file2, (short)2);
+  }
+
+  /**
+   * Test invalid DFS_DATANODE_FAILED_VOLUMES_TOLERATED_KEY values.
+   */
+  @Test
+  public void testInvalidFailedVolumesConfig() throws Exception {
+    if (System.getProperty("os.name").startsWith("Windows")) {
+      // See above
+      return;
+    }
+    /*
+     * Bring up another datanode that has an invalid value set.
+     * We should still be able to create a file with two replicas
+     * since the minimum valid volume parameter is only checked
+     * when we experience a disk error.
+     */
+    conf.setInt(DFSConfigKeys.DFS_DATANODE_FAILED_VOLUMES_TOLERATED_KEY, -1);
+    cluster.startDataNodes(conf, 1, true, null, null);
+    cluster.waitActive();
+    Path file1 = new Path("/test1");
+    DFSTestUtil.createFile(fs, file1, 1024, (short)2, 1L);
+    DFSTestUtil.waitReplication(fs, file1, (short)2);
+    // Ditto if the value is too big.
+    conf.setInt(DFSConfigKeys.DFS_DATANODE_FAILED_VOLUMES_TOLERATED_KEY, 100);
+    cluster.startDataNodes(conf, 1, true, null, null);
+    cluster.waitActive();
+    Path file2 = new Path("/test1");
+    DFSTestUtil.createFile(fs, file2, 1024, (short)2, 1L);
+    DFSTestUtil.waitReplication(fs, file2, (short)2);
+  }
+}

+ 108 - 98
src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/TestDiskError.java

@@ -17,14 +17,11 @@
  */
 package org.apache.hadoop.hdfs.server.datanode;
 
-
 import java.io.DataOutputStream;
 import java.io.File;
 import java.net.InetSocketAddress;
 import java.net.Socket;
 
-import junit.framework.TestCase;
-
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
@@ -41,8 +38,40 @@ import org.apache.hadoop.hdfs.server.namenode.NameNodeAdapter;
 import org.apache.hadoop.hdfs.HdfsConfiguration;
 import org.apache.hadoop.hdfs.DFSConfigKeys;
 
-/** Test if a datanode can correctly handle errors during block read/write*/
-public class TestDiskError extends TestCase {
+import org.junit.Test;
+import org.junit.Before;
+import org.junit.After;
+import static org.junit.Assert.*;
+
+/**
+ * Test that datanodes can correctly handle errors during block read/write.
+ */
+public class TestDiskError {
+
+  private FileSystem fs;
+  private MiniDFSCluster cluster;
+  private Configuration conf;
+  private String dataDir;
+
+  @Before
+  public void setUp() throws Exception {
+    conf = new HdfsConfiguration();
+    conf.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, 512L);
+    cluster = new MiniDFSCluster.Builder(conf).numDataNodes(1).build();
+    cluster.waitActive();
+    fs = cluster.getFileSystem();
+    dataDir = cluster.getDataDirectory();
+  }
+
+  @After
+  public void tearDown() throws Exception {
+    cluster.shutdown();
+  }
+
+  /**
+   * Test to check that a DN goes down when all its volumes have failed.
+   */
+  @Test
   public void testShutdown() throws Exception {
     if (System.getProperty("os.name").startsWith("Windows")) {
       /**
@@ -53,12 +82,9 @@ public class TestDiskError extends TestCase {
        */
       return;
     }
-    // bring up a cluster of 3
-    Configuration conf = new HdfsConfiguration();
-    conf.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, 512L);
-    MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).numDataNodes(3).build();
+    // Bring up two more datanodes
+    cluster.startDataNodes(conf, 2, true, null, null);
     cluster.waitActive();
-    FileSystem fs = cluster.getFileSystem();
     final int dnIndex = 0;
     String bpid = cluster.getNamesystem().getBlockPoolId();
     File storageDir = MiniDFSCluster.getStorageDir(dnIndex, 0);
@@ -67,8 +93,8 @@ public class TestDiskError extends TestCase {
     File dir2 = MiniDFSCluster.getRbwDir(storageDir, bpid);
     try {
       // make the data directory of the first datanode to be readonly
-      assertTrue(dir1.setReadOnly());
-      assertTrue(dir2.setReadOnly());
+      assertTrue("Couldn't chmod local vol", dir1.setReadOnly());
+      assertTrue("Couldn't chmod local vol", dir2.setReadOnly());
 
       // create files and make sure that first datanode will be down
       DataNode dn = cluster.getDataNodes().get(dnIndex);
@@ -82,108 +108,92 @@ public class TestDiskError extends TestCase {
       // restore its old permission
       dir1.setWritable(true);
       dir2.setWritable(true);
-      cluster.shutdown();
     }
   }
-  
+
+  /**
+   * Test that when there is a failure replicating a block the temporary
+   * and meta files are cleaned up and subsequent replication succeeds.
+   */
+  @Test
   public void testReplicationError() throws Exception {
-    // bring up a cluster of 1
-    Configuration conf = new HdfsConfiguration();
-    MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).build();
+    // create a file of replication factor of 1
+    final Path fileName = new Path("/test.txt");
+    final int fileLen = 1;
+    DFSTestUtil.createFile(fs, fileName, 1, (short)1, 1L);
+    DFSTestUtil.waitReplication(fs, fileName, (short)1);
+
+    // get the block belonged to the created file
+    LocatedBlocks blocks = NameNodeAdapter.getBlockLocations(
+        cluster.getNameNode(), fileName.toString(), 0, (long)fileLen);
+    assertEquals("Should only find 1 block", blocks.locatedBlockCount(), 1);
+    LocatedBlock block = blocks.get(0);
+
+    // bring up a second datanode
+    cluster.startDataNodes(conf, 1, true, null, null);
     cluster.waitActive();
-    FileSystem fs = cluster.getFileSystem();
+    final int sndNode = 1;
+    DataNode datanode = cluster.getDataNodes().get(sndNode);
     
-    try {
-      // create a file of replication factor of 1
-      final Path fileName = new Path("/test.txt");
-      final int fileLen = 1;
-      DFSTestUtil.createFile(fs, fileName, 1, (short)1, 1L);
-      DFSTestUtil.waitReplication(fs, fileName, (short)1);
-
-      // get the block belonged to the created file
-      LocatedBlocks blocks = NameNodeAdapter.getBlockLocations(
-          cluster.getNameNode(), fileName.toString(), 0, (long)fileLen);
-      assertEquals(blocks.locatedBlockCount(), 1);
-      LocatedBlock block = blocks.get(0);
-      
-      // bring up a second datanode
-      cluster.startDataNodes(conf, 1, true, null, null);
-      cluster.waitActive();
-      final int sndNode = 1;
-      DataNode datanode = cluster.getDataNodes().get(sndNode);
-      
-      // replicate the block to the second datanode
-      InetSocketAddress target = datanode.getSelfAddr();
-      Socket s = new Socket(target.getAddress(), target.getPort());
-        //write the header.
-      DataOutputStream out = new DataOutputStream(
-          s.getOutputStream());
-
-      Sender.opWriteBlock(out, block.getBlock(), 1, 
-          BlockConstructionStage.PIPELINE_SETUP_CREATE, 
-          0L, 0L, 0L, "", null, new DatanodeInfo[0], 
-          BlockTokenSecretManager.DUMMY_TOKEN);
-
-      // write check header
-      out.writeByte( 1 );
-      out.writeInt( 512 );
-
-      out.flush();
-
-      // close the connection before sending the content of the block
-      out.close();
-      
-      // the temporary block & meta files should be deleted
-      String bpid = cluster.getNamesystem().getBlockPoolId();
-      File storageDir = MiniDFSCluster.getStorageDir(sndNode, 0);
-      File dir1 = MiniDFSCluster.getRbwDir(storageDir, bpid);
-      storageDir = MiniDFSCluster.getStorageDir(sndNode, 1);
-      File dir2 = MiniDFSCluster.getRbwDir(storageDir, bpid);
-      while (dir1.listFiles().length != 0 || dir2.listFiles().length != 0) {
-        Thread.sleep(100);
-      }
-      
-      // then increase the file's replication factor
-      fs.setReplication(fileName, (short)2);
-      // replication should succeed
-      DFSTestUtil.waitReplication(fs, fileName, (short)1);
-      
-      // clean up the file
-      fs.delete(fileName, false);
-    } finally {
-      cluster.shutdown();
+    // replicate the block to the second datanode
+    InetSocketAddress target = datanode.getSelfAddr();
+    Socket s = new Socket(target.getAddress(), target.getPort());
+    // write the header.
+    DataOutputStream out = new DataOutputStream(s.getOutputStream());
+
+    Sender.opWriteBlock(out, block.getBlock(), 1,
+        BlockConstructionStage.PIPELINE_SETUP_CREATE,
+        0L, 0L, 0L, "", null, new DatanodeInfo[0],
+        BlockTokenSecretManager.DUMMY_TOKEN);
+
+    // write check header
+    out.writeByte( 1 );
+    out.writeInt( 512 );
+    out.flush();
+
+    // close the connection before sending the content of the block
+    out.close();
+
+    // the temporary block & meta files should be deleted
+    String bpid = cluster.getNamesystem().getBlockPoolId();
+    File storageDir = MiniDFSCluster.getStorageDir(sndNode, 0);
+    File dir1 = MiniDFSCluster.getRbwDir(storageDir, bpid);
+    storageDir = MiniDFSCluster.getStorageDir(sndNode, 1);
+    File dir2 = MiniDFSCluster.getRbwDir(storageDir, bpid);
+    while (dir1.listFiles().length != 0 || dir2.listFiles().length != 0) {
+      Thread.sleep(100);
     }
+
+    // then increase the file's replication factor
+    fs.setReplication(fileName, (short)2);
+    // replication should succeed
+    DFSTestUtil.waitReplication(fs, fileName, (short)1);
+
+    // clean up the file
+    fs.delete(fileName, false);
   }
 
+  /**
+   * Check that the permissions of the local DN directories are as expected.
+   */
+  @Test
   public void testLocalDirs() throws Exception {
     Configuration conf = new Configuration();
     final String permStr = "755";
     FsPermission expected = new FsPermission(permStr);
     conf.set(DFSConfigKeys.DFS_DATANODE_DATA_DIR_PERMISSION_KEY, permStr);
-    MiniDFSCluster cluster = null;
 
-    try {
-      // Start the cluster
-      cluster = new MiniDFSCluster.Builder(conf).build();
-      cluster.waitActive();
-
-      // Check permissions on directories in 'dfs.data.dir'
-      FileSystem localFS = FileSystem.getLocal(conf);
-      for (DataNode dn : cluster.getDataNodes()) {
-        String[] dataDirs =
-          dn.getConf().getStrings(DFSConfigKeys.DFS_DATANODE_DATA_DIR_KEY);
-        for (String dir : dataDirs) {
-          Path dataDir = new Path(dir);
-          FsPermission actual = localFS.getFileStatus(dataDir).getPermission();
+    // Check permissions on directories in 'dfs.data.dir'
+    FileSystem localFS = FileSystem.getLocal(conf);
+    for (DataNode dn : cluster.getDataNodes()) {
+      String[] dataDirs =
+        dn.getConf().getStrings(DFSConfigKeys.DFS_DATANODE_DATA_DIR_KEY);
+      for (String dir : dataDirs) {
+        Path dataDir = new Path(dir);
+        FsPermission actual = localFS.getFileStatus(dataDir).getPermission();
           assertEquals("Permission for dir: " + dataDir + ", is " + actual +
-                           ", while expected is " + expected,
-                       expected, actual);
-        }
+              ", while expected is " + expected, expected, actual);
       }
-    } finally {
-      if (cluster != null)
-        cluster.shutdown();
     }
-
   }
 }