Procházet zdrojové kódy

Merging changes r1036303:r1036692 from trunk to federation

git-svn-id: https://svn.apache.org/repos/asf/hadoop/hdfs/branches/HDFS-1052@1078873 13f79535-47bb-0310-9956-ffa450edef68
Suresh Srinivas před 14 roky
rodič
revize
6345000098

+ 4 - 0
.gitignore

@@ -18,6 +18,10 @@
 .project
 .launches/
 .settings
+*.iml
+*.ipr
+*.iws
+.idea
 .svn
 build/
 build-fi/

+ 8 - 0
CHANGES.txt

@@ -298,6 +298,9 @@ Release 0.22.0 - Unreleased
     HDFS-895. Allow hflush/sync to occur in parallel with new writes
     to the file. (Todd Lipcon via hairong)
 
+    HDFS-528. Add ability for safemode to wait for a minimum number of 
+    live datanodes (Todd Lipcon via eli)
+
   IMPROVEMENTS
 
     HDFS-1304. Add a new unit test for HftpFileSystem.open(..).  (szetszwo)
@@ -625,6 +628,11 @@ Release 0.22.0 - Unreleased
     HDFS-874. TestHDFSFileContextMainOperations fails on weirdly 
     configured DNS hosts. (Todd Lipcon via eli)
 
+    HDFS-1507. TestAbandonBlock should abandon a block. (eli)
+
+    HDFS-1487. FSDirectory.removeBlock() should update diskspace count 
+    of the block owner node (Zhong Wang via eli).
+
 Release 0.21.1 - Unreleased
 
     HDFS-1411. Correct backup node startup command in hdfs user guide.

+ 16 - 1
src/java/hdfs-default.xml

@@ -344,11 +344,26 @@ creations/deletions), or "all".</description>
   <description>
     Specifies the percentage of blocks that should satisfy 
     the minimal replication requirement defined by dfs.namenode.replication.min.
-    Values less than or equal to 0 mean not to start in safe mode.
+    Values less than or equal to 0 mean not to wait for any particular
+    percentage of blocks before exiting safemode.
     Values greater than 1 will make safe mode permanent.
   </description>
 </property>
 
+<property>
+  <name>dfs.namenode.safemode.min.datanodes</name>
+  <value>0</value>
+  <description>
+    Specifies the number of datanodes that must be considered alive
+    before the name node exits safemode.
+    Values less than or equal to 0 mean not to take the number of live
+    datanodes into account when deciding whether to remain in safe mode
+    during startup.
+    Values greater than the number of datanodes in the cluster
+    will make safe mode permanent.
+  </description>
+</property>
+
 <property>
   <name>dfs.namenode.safemode.extension</name>
   <value>30000</value>

+ 2 - 0
src/java/org/apache/hadoop/hdfs/DFSConfigKeys.java

@@ -58,6 +58,8 @@ public class DFSConfigKeys extends CommonConfigurationKeys {
   public static final int     DFS_NAMENODE_SAFEMODE_EXTENSION_DEFAULT = 30000;
   public static final String  DFS_NAMENODE_SAFEMODE_THRESHOLD_PCT_KEY = "dfs.namenode.safemode.threshold-pct";
   public static final float   DFS_NAMENODE_SAFEMODE_THRESHOLD_PCT_DEFAULT = 0.999f;
+  public static final String  DFS_NAMENODE_SAFEMODE_MIN_DATANODES_KEY = "dfs.namenode.safemode.min.datanodes";
+  public static final int     DFS_NAMENODE_SAFEMODE_MIN_DATANODES_DEFAULT = 0;
   public static final String  DFS_NAMENODE_SECONDARY_HTTP_ADDRESS_KEY = "dfs.namenode.secondary.http-address";
   public static final String  DFS_NAMENODE_SECONDARY_HTTP_ADDRESS_DEFAULT = "0.0.0.0:50090";
   public static final String  DFS_NAMENODE_CHECKPOINT_PERIOD_KEY = "dfs.namenode.checkpoint.period";

+ 5 - 0
src/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java

@@ -453,6 +453,11 @@ class FSDirectory implements Closeable {
             +path+" with "+block
             +" block is added to the file system");
       }
+
+      // update space consumed
+      INode[] pathINodes = getExistingPathINodes(path);
+      updateCount(pathINodes, pathINodes.length-1, 0,
+          -fileNode.getPreferredBlockSize()*fileNode.getReplication(), true);
     } finally {
       writeUnlock();
     }

+ 44 - 9
src/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java

@@ -2638,6 +2638,10 @@ public class FSNamesystem implements FSConstants, FSNamesystemMBean, FSClusterSt
       // no need to update its timestamp
       // because its is done when the descriptor is created
     }
+
+    if (safeMode != null) {
+      safeMode.checkMode();
+    }
     return;
     } finally {
       writeUnlock();
@@ -2996,6 +3000,10 @@ public class FSNamesystem implements FSConstants, FSNamesystemMBean, FSClusterSt
     }
     unprotectedRemoveDatanode(nodeInfo);
     clusterMap.remove(nodeInfo);
+
+    if (safeMode != null) {
+      safeMode.checkMode();
+    }
   }
 
   void unprotectedRemoveDatanode(DatanodeDescriptor nodeDescr) {
@@ -3778,6 +3786,8 @@ public class FSNamesystem implements FSConstants, FSNamesystemMBean, FSClusterSt
     // configuration fields
     /** Safe mode threshold condition %.*/
     private double threshold;
+    /** Safe mode minimum number of datanodes alive */
+    private int datanodeThreshold;
     /** Safe mode extension after the threshold. */
     private int extension;
     /** Min replication required by safe mode. */
@@ -3807,6 +3817,9 @@ public class FSNamesystem implements FSConstants, FSNamesystemMBean, FSClusterSt
      */
     SafeModeInfo(Configuration conf) {
       this.threshold = conf.getFloat(DFSConfigKeys.DFS_NAMENODE_SAFEMODE_THRESHOLD_PCT_KEY, 0.95f);
+      this.datanodeThreshold = conf.getInt(
+        DFSConfigKeys.DFS_NAMENODE_SAFEMODE_MIN_DATANODES_KEY,
+        DFSConfigKeys.DFS_NAMENODE_SAFEMODE_MIN_DATANODES_DEFAULT);
       this.extension = conf.getInt(DFSConfigKeys.DFS_NAMENODE_SAFEMODE_EXTENSION_KEY, 0);
       this.safeReplication = conf.getInt(DFSConfigKeys.DFS_NAMENODE_REPLICATION_MIN_KEY, 
                                          DFSConfigKeys.DFS_NAMENODE_REPLICATION_MIN_DEFAULT);
@@ -3824,6 +3837,7 @@ public class FSNamesystem implements FSConstants, FSNamesystemMBean, FSClusterSt
      */
     private SafeModeInfo() {
       this.threshold = 1.5f;  // this threshold can never be reached
+      this.datanodeThreshold = Integer.MAX_VALUE;
       this.extension = Integer.MAX_VALUE;
       this.safeReplication = Short.MAX_VALUE + 1; // more than maxReplication
       this.blockTotal = -1;
@@ -3916,7 +3930,8 @@ public class FSNamesystem implements FSConstants, FSNamesystemMBean, FSClusterSt
      * if DFS is empty or {@link #threshold} == 0
      */
     boolean needEnter() {
-      return threshold != 0 && blockSafe < blockThreshold;
+      return (threshold != 0 && blockSafe < blockThreshold) ||
+        (getNumLiveDataNodes() < datanodeThreshold);
     }
       
     /**
@@ -4006,17 +4021,37 @@ public class FSNamesystem implements FSConstants, FSNamesystemMBean, FSClusterSt
       }
       if(blockTotal < 0)
         return leaveMsg + ".";
-      
-      String msg = null;
+
+      int numLive = getNumLiveDataNodes();
+      String msg = "";
       if (reached == 0) {
-        msg = String.format("The reported blocks %d needs additional %d"
-            + " blocks to reach the threshold %.4f of total blocks %d. %s",
-            blockSafe, (blockThreshold - blockSafe), threshold, blockTotal,
-            leaveMsg);
+        if (blockSafe < blockThreshold) {
+          msg += String.format(
+            "The reported blocks %d needs additional %d"
+            + " blocks to reach the threshold %.4f of total blocks %d.",
+            blockSafe, (blockThreshold - blockSafe), threshold, blockTotal);
+        }
+        if (numLive < datanodeThreshold) {
+          if (!"".equals(msg)) {
+            msg += "\n";
+          }
+          msg += String.format(
+            "The number of live datanodes %d needs an additional %d live "
+            + "datanodes to reach the minimum number %d.",
+            numLive, datanodeThreshold - numLive, datanodeThreshold);
+        }
+        msg += " " + leaveMsg;
       } else {
         msg = String.format("The reported blocks %d has reached the threshold"
-            + " %.4f of total blocks %d. %s", blockSafe, threshold, 
-            blockTotal, leaveMsg);
+            + " %.4f of total blocks %d.", blockSafe, threshold, 
+            blockTotal);
+
+        if (datanodeThreshold > 0) {
+          msg += String.format(" The number of live datanodes %d has reached "
+                               + "the minimum number %d.",
+                               numLive, datanodeThreshold);
+        }
+        msg += " " + leaveMsg;
       }
       if(reached == 0 || isManual()) {  // threshold is not reached or manual       
         return msg + ".";

+ 68 - 31
src/test/hdfs/org/apache/hadoop/hdfs/TestAbandonBlock.java

@@ -23,49 +23,86 @@ import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.*;
+import org.apache.hadoop.hdfs.protocol.FSConstants;
 import org.apache.hadoop.hdfs.protocol.LocatedBlock;
 import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
-import org.apache.hadoop.hdfs.server.namenode.NameNode;
-import org.apache.hadoop.util.StringUtils;
+import org.apache.hadoop.hdfs.protocol.QuotaExceededException;
 
-public class TestAbandonBlock extends junit.framework.TestCase {
+import static org.junit.Assert.*;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+/**
+ * Test abandoning blocks, which clients do on pipeline creation failure.
+ */
+public class TestAbandonBlock {
   public static final Log LOG = LogFactory.getLog(TestAbandonBlock.class);
   
   private static final Configuration CONF = new HdfsConfiguration();
   static final String FILE_NAME_PREFIX
       = "/" + TestAbandonBlock.class.getSimpleName() + "_"; 
+  private MiniDFSCluster cluster;
+  private FileSystem fs;
 
-  public void testAbandonBlock() throws IOException {
-    MiniDFSCluster cluster = new MiniDFSCluster.Builder(CONF).numDataNodes(2).build();
-    FileSystem fs = cluster.getFileSystem();
+  @Before
+  public void setUp() throws Exception {
+    cluster = new MiniDFSCluster.Builder(CONF).numDataNodes(2).build();
+    fs = cluster.getFileSystem();
+    cluster.waitActive();
+  }
 
+  @After
+  public void tearDown() throws Exception {
+    fs.close();
+    cluster.shutdown();
+  }
+
+  @Test
+  /** Abandon a block while creating a file */
+  public void testAbandonBlock() throws IOException {
     String src = FILE_NAME_PREFIX + "foo";
-    FSDataOutputStream fout = null;
-    try {
-      //start writing a a file but not close it
-      fout = fs.create(new Path(src), true, 4096, (short)1, 512L);
-      for(int i = 0; i < 1024; i++) {
-        fout.write(123);
-      }
-      fout.hflush();
-  
-      //try reading the block by someone
-      final DFSClient dfsclient = new DFSClient(NameNode.getAddress(CONF), CONF);
-      LocatedBlocks blocks = dfsclient.getNamenode().getBlockLocations(src, 0, 1);
-      LocatedBlock b = blocks.get(0); 
-      try {
-        dfsclient.getNamenode().abandonBlock(b.getBlock(), src, "someone");
-        //previous line should throw an exception.
-        assertTrue(false);
-      }
-      catch(IOException ioe) {
-        LOG.info("GREAT! " + StringUtils.stringifyException(ioe));
-      }
+
+    // Start writing a file but do not close it
+    FSDataOutputStream fout = fs.create(new Path(src), true, 4096, (short)1, 512L);
+    for (int i = 0; i < 1024; i++) {
+      fout.write(123);
+    }
+    fout.hflush();
+
+    // Now abandon the last block
+    DFSClient dfsclient = ((DistributedFileSystem)fs).getClient();
+    LocatedBlocks blocks = dfsclient.getNamenode().getBlockLocations(src, 0, 1);
+    LocatedBlock b = blocks.getLastLocatedBlock();
+    dfsclient.getNamenode().abandonBlock(b.getBlock(), src, dfsclient.clientName);
+
+    // And close the file
+    fout.close();
+  }
+
+  @Test
+  /** Make sure that the quota is decremented correctly when a block is abandoned */
+  public void testQuotaUpdatedWhenBlockAbandoned() throws IOException {
+    DistributedFileSystem dfs = (DistributedFileSystem)fs;
+    // Setting diskspace quota to 3MB
+    dfs.setQuota(new Path("/"), FSConstants.QUOTA_DONT_SET, 3 * 1024 * 1024);
+
+    // Start writing a file with 2 replicas to ensure each datanode has one.
+    // Block Size is 1MB.
+    String src = FILE_NAME_PREFIX + "test_quota1";
+    FSDataOutputStream fout = fs.create(new Path(src), true, 4096, (short)2, 1024 * 1024);
+    for (int i = 0; i < 1024; i++) {
+      fout.writeByte(123);
     }
-    finally {
-      try{fout.close();} catch(Exception e) {}
-      try{fs.close();} catch(Exception e) {}
-      try{cluster.shutdown();} catch(Exception e) {}
+
+    // Shutdown one datanode, causing the block abandonment.
+    cluster.getDataNodes().get(0).shutdown();
+
+    // Close the file, new block will be allocated with 2MB pending size.
+    try {
+      fout.close();
+    } catch (QuotaExceededException e) {
+      fail("Unexpected quota exception when closing fout");
     }
   }
 }

+ 159 - 0
src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/TestSafeMode.java

@@ -0,0 +1,159 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hdfs.server.namenode;
+
+import java.io.IOException;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.protocol.FSConstants.SafeModeAction;
+import org.apache.hadoop.hdfs.DFSConfigKeys;
+import org.apache.hadoop.hdfs.DistributedFileSystem;
+import org.apache.hadoop.hdfs.HdfsConfiguration;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.hdfs.DFSTestUtil;
+import junit.framework.TestCase;
+
+/**
+ * Tests to verify safe mode correctness.
+ */
+public class TestSafeMode extends TestCase {
+  
+  static Log LOG = LogFactory.getLog(TestSafeMode.class);
+
+  /**
+   * This test verifies that if SafeMode is manually entered, name-node does not
+   * come out of safe mode even after the startup safe mode conditions are met.
+   * <ol>
+   * <li>Start cluster with 1 data-node.</li>
+   * <li>Create 2 files with replication 1.</li>
+   * <li>Re-start cluster with 0 data-nodes. 
+   * Name-node should stay in automatic safe-mode.</li>
+   * <li>Enter safe mode manually.</li>
+   * <li>Start the data-node.</li>
+   * <li>Wait longer than <tt>dfs.safemode.extension</tt> and 
+   * verify that the name-node is still in safe mode.</li>
+   * </ol>
+   *  
+   * @throws IOException
+   */
+  public void testManualSafeMode() throws IOException {
+    MiniDFSCluster cluster = null;
+    DistributedFileSystem fs = null;
+    try {
+      Configuration conf = new HdfsConfiguration();
+      // disable safemode extension to make the test run faster.
+      conf.set(DFSConfigKeys.DFS_NAMENODE_SAFEMODE_EXTENSION_KEY, "1");
+      cluster = new MiniDFSCluster.Builder(conf).build();
+      cluster.waitActive();
+      
+      fs = (DistributedFileSystem)cluster.getFileSystem();
+      Path file1 = new Path("/tmp/testManualSafeMode/file1");
+      Path file2 = new Path("/tmp/testManualSafeMode/file2");
+      
+      LOG.info("Created file1 and file2.");
+      
+      // create two files with one block each.
+      DFSTestUtil.createFile(fs, file1, 1000, (short)1, 0);
+      DFSTestUtil.createFile(fs, file2, 2000, (short)1, 0);
+      fs.close();
+      cluster.shutdown();
+      
+      // now bring up just the NameNode.
+      cluster = new MiniDFSCluster.Builder(conf).numDataNodes(0).format(false).build();
+      cluster.waitActive();
+      fs = (DistributedFileSystem)cluster.getFileSystem();
+      
+      LOG.info("Restarted cluster with just the NameNode");
+      
+      assertTrue("No datanode is started. Should be in SafeMode", 
+                 fs.setSafeMode(SafeModeAction.SAFEMODE_GET));
+      
+      // manually set safemode.
+      fs.setSafeMode(SafeModeAction.SAFEMODE_ENTER);
+      
+      // now bring up the datanode and wait for it to be active.
+      cluster.startDataNodes(conf, 1, true, null, null);
+      cluster.waitActive();
+      
+      LOG.info("Datanode is started.");
+
+      // wait longer than dfs.safemode.extension
+      try {
+        Thread.sleep(2000);
+      } catch (InterruptedException ignored) {}
+      
+      assertTrue("should still be in SafeMode",
+          fs.setSafeMode(SafeModeAction.SAFEMODE_GET));
+      
+      fs.setSafeMode(SafeModeAction.SAFEMODE_LEAVE);
+      assertFalse("should not be in SafeMode",
+          fs.setSafeMode(SafeModeAction.SAFEMODE_GET));
+    } finally {
+      if(fs != null) fs.close();
+      if(cluster!= null) cluster.shutdown();
+    }
+  }
+
+
+  /**
+   * Verify that the NameNode stays in safemode when dfs.safemode.datanode.min
+   * is set to a number greater than the number of live datanodes.
+   */
+  public void testDatanodeThreshold() throws IOException {
+    MiniDFSCluster cluster = null;
+    DistributedFileSystem fs = null;
+    try {
+      Configuration conf = new Configuration();
+      conf.setInt(DFSConfigKeys.DFS_NAMENODE_SAFEMODE_EXTENSION_KEY, 0);
+      conf.setInt(DFSConfigKeys.DFS_NAMENODE_SAFEMODE_MIN_DATANODES_KEY, 1);
+
+      // bring up a cluster with no datanodes
+      cluster = new MiniDFSCluster.Builder(conf).numDataNodes(0).format(true).build();
+      cluster.waitActive();
+      fs = (DistributedFileSystem)cluster.getFileSystem();
+
+      assertTrue("No datanode started, but we require one - safemode expected",
+                 fs.setSafeMode(SafeModeAction.SAFEMODE_GET));
+
+      String tipMsg = cluster.getNamesystem().getSafeModeTip();
+      assertTrue("Safemode tip message looks right",
+                 tipMsg.contains("The number of live datanodes 0 needs an " +
+                                 "additional 1 live"));
+
+      // Start a datanode
+      cluster.startDataNodes(conf, 1, true, null, null);
+
+      // Wait long enough for safemode check to refire
+      try {
+        Thread.sleep(1000);
+      } catch (InterruptedException ignored) {}
+
+      // We now should be out of safe mode.
+      assertFalse(
+        "Out of safe mode after starting datanode.",
+        fs.setSafeMode(SafeModeAction.SAFEMODE_GET));
+    } finally {
+      if (fs != null) fs.close();
+      if (cluster != null) cluster.shutdown();
+    }
+  }
+}