浏览代码

svn merge -c 1325531 Merging from trunk to branch-0.23 to fix HDFS-3256.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-0.23@1455991 13f79535-47bb-0310-9956-ffa450edef68
Kihwal Lee 12 年之前
父节点
当前提交
faa2bbce46

+ 3 - 0
hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt

@@ -78,6 +78,9 @@ Release 0.23.7 - UNRELEASED
     HDFS-4577. Webhdfs operations should declare if authentication is required
     (daryn via kihwal)
 
+    HDFS-3256. HDFS considers blocks under-replicated if topology script is
+    configured with only 1 rack. (atm)
+
 Release 0.23.6 - 2013-02-06
 
   INCOMPATIBLE CHANGES

+ 4 - 3
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java

@@ -235,8 +235,7 @@ public class BlockManager {
             DFSConfigKeys.DFS_NAMENODE_REPLICATION_STREAMS_HARD_LIMIT_KEY,
             DFSConfigKeys.DFS_NAMENODE_REPLICATION_STREAMS_HARD_LIMIT_DEFAULT);
     this.shouldCheckForEnoughRacks =
-        conf.get(DFSConfigKeys.NET_TOPOLOGY_SCRIPT_FILE_NAME_KEY) == null
-            ? false : true;
+        conf.get(DFSConfigKeys.NET_TOPOLOGY_SCRIPT_FILE_NAME_KEY) != null;
     
     this.replicationRecheckInterval = 
       conf.getInt(DFSConfigKeys.DFS_NAMENODE_REPLICATION_INTERVAL_KEY, 
@@ -2534,7 +2533,9 @@ public class BlockManager {
       DatanodeDescriptor cur = it.next();
       if (!cur.isDecommissionInProgress() && !cur.isDecommissioned()) {
         if ((corruptNodes == null ) || !corruptNodes.contains(cur)) {
-          if (numExpectedReplicas == 1) {
+          if (numExpectedReplicas == 1 ||
+              (numExpectedReplicas > 1 &&
+                  !datanodeManager.hasClusterEverBeenMultiRack())) {
             enoughRacks = true;
             break;
           }

+ 45 - 0
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java

@@ -70,6 +70,8 @@ import org.apache.hadoop.util.Daemon;
 import org.apache.hadoop.util.HostsFileReader;
 import org.apache.hadoop.util.ReflectionUtils;
 
+import com.google.common.annotations.VisibleForTesting;
+
 /**
  * Manage datanodes, include decommission and other activities.
  */
@@ -123,6 +125,12 @@ public class DatanodeManager {
   /** Ask Datanode only up to this many blocks to delete. */
   final int blockInvalidateLimit;
   
+  /**
+   * Whether or not this cluster has ever consisted of more than 1 rack,
+   * according to the NetworkTopology.
+   */
+  private boolean hasClusterEverBeenMultiRack = false;
+  
   DatanodeManager(final BlockManager blockManager,
       final Namesystem namesystem, final Configuration conf
       ) throws IOException {
@@ -328,6 +336,7 @@ public class DatanodeManager {
 
     host2DatanodeMap.add(node);
     networktopology.add(node);
+    checkIfClusterIsNowMultiRack(node);
 
     if (LOG.isDebugEnabled()) {
       LOG.debug(getClass().getSimpleName() + ".addDatanode: "
@@ -766,6 +775,42 @@ public class DatanodeManager {
     }
   }
 
+  /**
+   * @return true if this cluster has ever consisted of multiple racks, even if
+   *         it is not now a multi-rack cluster.
+   */
+  boolean hasClusterEverBeenMultiRack() {
+    return hasClusterEverBeenMultiRack;
+  }
+
+  /**
+   * Check if the cluster now consists of multiple racks. If it does, and this
+   * is the first time it's consisted of multiple racks, then process blocks
+   * that may now be misreplicated.
+   * 
+   * @param node DN which caused cluster to become multi-rack. Used for logging.
+   */
+  @VisibleForTesting
+  void checkIfClusterIsNowMultiRack(DatanodeDescriptor node) {
+    if (!hasClusterEverBeenMultiRack && networktopology.getNumOfRacks() > 1) {
+      String message = "DN " + node + " joining cluster has expanded a formerly " +
+          "single-rack cluster to be multi-rack. ";
+      if (namesystem.isPopulatingReplQueues()) {
+        message += "Re-checking all blocks for replication, since they should " +
+            "now be replicated cross-rack";
+        LOG.info(message);
+      } else {
+        message += "Not checking for mis-replicated blocks because this NN is " +
+            "not yet processing repl queues.";
+        LOG.debug(message);
+      }
+      hasClusterEverBeenMultiRack = true;
+      if (namesystem.isPopulatingReplQueues()) {
+        blockManager.processMisReplicatedBlocks();
+      }
+    }
+  }
+
   /** For generating datanode reports */
   public List<DatanodeDescriptor> getDatanodeListForReport(
       final DatanodeReportType type) {

+ 27 - 0
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockManager.java

@@ -96,6 +96,7 @@ public class TestBlockManager {
       dn.updateHeartbeat(
           2*HdfsConstants.MIN_BLOCKS_FOR_WRITE*BLOCK_SIZE, 0L,
           2*HdfsConstants.MIN_BLOCKS_FOR_WRITE*BLOCK_SIZE, 0L, 0, 0);
+      bm.getDatanodeManager().checkIfClusterIsNowMultiRack(dn);
     }
   }
 
@@ -314,6 +315,32 @@ public class TestBlockManager {
         rackB.contains(pipeline[1]));
   }
   
+  @Test
+  public void testBlocksAreNotUnderreplicatedInSingleRack() throws Exception {
+    List<DatanodeDescriptor> nodes = ImmutableList.of( 
+        new DatanodeDescriptor(new DatanodeID("h1:5020"), "/rackA"),
+        new DatanodeDescriptor(new DatanodeID("h2:5020"), "/rackA"),
+        new DatanodeDescriptor(new DatanodeID("h3:5020"), "/rackA"),
+        new DatanodeDescriptor(new DatanodeID("h4:5020"), "/rackA"),
+        new DatanodeDescriptor(new DatanodeID("h5:5020"), "/rackA"),
+        new DatanodeDescriptor(new DatanodeID("h6:5020"), "/rackA")
+      );
+    addNodes(nodes);
+    List<DatanodeDescriptor> origNodes = nodes.subList(0, 3);;
+    for (int i = 0; i < NUM_TEST_ITERS; i++) {
+      doTestSingleRackClusterIsSufficientlyReplicated(i, origNodes);
+    }
+  }
+  
+  private void doTestSingleRackClusterIsSufficientlyReplicated(int testIndex,
+      List<DatanodeDescriptor> origNodes)
+      throws Exception {
+    assertEquals(0, bm.numOfUnderReplicatedBlocks());
+    addBlockOnNodes((long)testIndex, origNodes);
+    bm.processMisReplicatedBlocks();
+    assertEquals(0, bm.numOfUnderReplicatedBlocks());
+  }
+  
   
   /**
    * Tell the block manager that replication is completed for the given

+ 2 - 2
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlocksWithNotEnoughRacks.java

@@ -97,7 +97,7 @@ public class TestBlocksWithNotEnoughRacks {
       final FileSystem fs = cluster.getFileSystem();
       DFSTestUtil.createFile(fs, filePath, 1L, REPLICATION_FACTOR, 1L);
       ExtendedBlock b = DFSTestUtil.getFirstBlock(fs, filePath);
-      DFSTestUtil.waitForReplication(cluster, b, 1, REPLICATION_FACTOR, 1);
+      DFSTestUtil.waitForReplication(cluster, b, 1, REPLICATION_FACTOR, 0);
 
       // Add a new datanode on a different rack
       String newRacks[] = {"/rack2"};
@@ -165,7 +165,7 @@ public class TestBlocksWithNotEnoughRacks {
       final FileSystem fs = cluster.getFileSystem();
       DFSTestUtil.createFile(fs, filePath, 1L, REPLICATION_FACTOR, 1L);
       ExtendedBlock b = DFSTestUtil.getFirstBlock(fs, filePath);
-      DFSTestUtil.waitForReplication(cluster, b, 1, REPLICATION_FACTOR, 1);
+      DFSTestUtil.waitForReplication(cluster, b, 1, REPLICATION_FACTOR, 0);
       
       // Add new datanodes on a different rack and increase the
       // replication factor so the block is underreplicated and make