浏览代码

HDFS-4351. In BlockPlacementPolicyDefault.chooseTarget(..), numOfReplicas needs to be updated when avoiding stale nodes. Contributed by Andrew Wang

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-1@1429655 13f79535-47bb-0310-9956-ffa450edef68
Tsz-wo Sze 12 年之前
父节点
当前提交
0e14e5ca1b

+ 3 - 0
CHANGES.txt

@@ -404,6 +404,9 @@ Release 1.2.0 - unreleased
     MAPREDUCE-4915. TestShuffleExceptionCount fails with open JDK7.
     MAPREDUCE-4915. TestShuffleExceptionCount fails with open JDK7.
     (Brandon Li via suresh)
     (Brandon Li via suresh)
 
 
+    HDFS-4351.  In BlockPlacementPolicyDefault.chooseTarget(..), numOfReplicas
+    needs to be updated when avoiding stale nodes.  (Andrew Wang via szetszwo)
+
 Release 1.1.2 - Unreleased
 Release 1.1.2 - Unreleased
 
 
   INCOMPATIBLE CHANGES
   INCOMPATIBLE CHANGES

+ 13 - 5
src/hdfs/org/apache/hadoop/hdfs/server/namenode/BlockPlacementPolicyDefault.java

@@ -161,6 +161,7 @@ public class BlockPlacementPolicyDefault extends BlockPlacementPolicy {
     if (numOfReplicas == 0 || clusterMap.getNumOfLeaves()==0) {
     if (numOfReplicas == 0 || clusterMap.getNumOfLeaves()==0) {
       return writer;
       return writer;
     }
     }
+    int totalReplicasExpected = numOfReplicas + results.size();
       
       
     int numOfResults = results.size();
     int numOfResults = results.size();
     boolean newBlock = (numOfResults==0);
     boolean newBlock = (numOfResults==0);
@@ -206,15 +207,22 @@ public class BlockPlacementPolicyDefault extends BlockPlacementPolicy {
           maxNodesPerRack, results, avoidStaleNodes);
           maxNodesPerRack, results, avoidStaleNodes);
     } catch (NotEnoughReplicasException e) {
     } catch (NotEnoughReplicasException e) {
       FSNamesystem.LOG.warn("Not able to place enough replicas, still in need of "
       FSNamesystem.LOG.warn("Not able to place enough replicas, still in need of "
-               + numOfReplicas);
+               + (totalReplicasExpected - results.size()) + " to reach "
+               + totalReplicasExpected + "\n"
+               + e.getMessage());
       if (avoidStaleNodes) {
       if (avoidStaleNodes) {
-        // excludedNodes now has - initial excludedNodes, any nodes that were
-        // chosen and nodes that were tried but were not chosen because they
-        // were stale, decommissioned or for any other reason a node is not
-        // chosen for write. Retry again now not avoiding stale node
+        // Retry chooseTarget again, this time not avoiding stale nodes.
+
+        // excludedNodes contains the initial excludedNodes and nodes that were
+        // not chosen because they were stale, decommissioned, etc.
+        // We need to additionally exclude the nodes that were added to the 
+        // result list in the successful calls to choose*() above.
         for (Node node : results) {
         for (Node node : results) {
           oldExcludedNodes.put(node, node);
           oldExcludedNodes.put(node, node);
         }
         }
+        // Set numOfReplicas, since it can get out of sync with the result list
+        // if the NotEnoughReplicasException was thrown in chooseRandom().
+        numOfReplicas = totalReplicasExpected - results.size();
         return chooseTarget(numOfReplicas, writer, oldExcludedNodes, blocksize,
         return chooseTarget(numOfReplicas, writer, oldExcludedNodes, blocksize,
             maxNodesPerRack, results, false);
             maxNodesPerRack, results, false);
       }
       }

+ 82 - 0
src/test/org/apache/hadoop/hdfs/server/namenode/TestReplicationPolicy.java

@@ -38,6 +38,10 @@ import org.apache.hadoop.hdfs.protocol.FSConstants;
 import org.apache.hadoop.hdfs.server.datanode.DataNode;
 import org.apache.hadoop.hdfs.server.datanode.DataNode;
 import org.apache.hadoop.net.NetworkTopology;
 import org.apache.hadoop.net.NetworkTopology;
 import org.apache.hadoop.net.Node;
 import org.apache.hadoop.net.Node;
+import org.apache.log4j.AppenderSkeleton;
+import org.apache.log4j.Level;
+import org.apache.log4j.Logger;
+import org.apache.log4j.spi.LoggingEvent;
 
 
 public class TestReplicationPolicy extends TestCase {
 public class TestReplicationPolicy extends TestCase {
   private static final int BLOCK_SIZE = 1024;
   private static final int BLOCK_SIZE = 1024;
@@ -341,6 +345,84 @@ public class TestReplicationPolicy extends TestCase {
     return false;
     return false;
   }
   }
 
 
+  /**
+   * In this testcase, it tries to choose more targets than available nodes and
+   * check the result, with stale node avoidance on the write path enabled.
+   * @throws Exception
+   */
+  public void testChooseTargetWithMoreThanAvailableNodesWithStaleness()
+      throws Exception {
+    try {
+      namenode.getNamesystem().setAvoidStaleDataNodesForWrite(true);
+      testChooseTargetWithMoreThanAvailableNodes();
+    } finally {
+      namenode.getNamesystem().setAvoidStaleDataNodesForWrite(false);
+    }
+  }
+  
+  /**
+   * In this testcase, it tries to choose more targets than available nodes and
+   * check the result. 
+   * @throws Exception
+   */
+  public void testChooseTargetWithMoreThanAvailableNodes() throws Exception {
+    // make data node 0 & 1 to be not qualified to choose: not enough disk space
+    for(int i=0; i<2; i++) {
+      dataNodes[i].updateHeartbeat(
+          2*FSConstants.MIN_BLOCKS_FOR_WRITE*BLOCK_SIZE, 0L,
+          (FSConstants.MIN_BLOCKS_FOR_WRITE-1)*BLOCK_SIZE, 0);
+    }
+    
+    final TestAppender appender = new TestAppender();
+    final Logger logger = Logger.getRootLogger();
+    logger.addAppender(appender);
+    
+    // try to choose NUM_OF_DATANODES which is more than actually available
+    // nodes.
+    DatanodeDescriptor[] targets = replicator.chooseTarget(filename, 
+        NUM_OF_DATANODES, dataNodes[0], new ArrayList<DatanodeDescriptor>(),
+        BLOCK_SIZE);
+    assertEquals(targets.length, NUM_OF_DATANODES - 2);
+
+    final List<LoggingEvent> log = appender.getLog();
+    assertNotNull(log);
+    assertFalse(log.size() == 0);
+    final LoggingEvent lastLogEntry = log.get(log.size() - 1);
+    
+    assertEquals(lastLogEntry.getLevel(), Level.WARN);
+    // Suppose to place replicas on each node but two data nodes are not
+    // available for placing replica, so here we expect a short of 2
+    assertTrue(((String)lastLogEntry.getMessage()).contains("in need of 2"));
+    
+    for(int i=0; i<2; i++) {
+      dataNodes[i].updateHeartbeat(
+          2*FSConstants.MIN_BLOCKS_FOR_WRITE*BLOCK_SIZE, 0L,
+          FSConstants.MIN_BLOCKS_FOR_WRITE*BLOCK_SIZE, 0);
+    }
+  }
+  
+  class TestAppender extends AppenderSkeleton {
+    private final List<LoggingEvent> log = new ArrayList<LoggingEvent>();
+
+    @Override
+    public boolean requiresLayout() {
+      return false;
+    }
+
+    @Override
+    protected void append(final LoggingEvent loggingEvent) {
+      log.add(loggingEvent);
+    }
+
+    @Override
+    public void close() {
+    }
+
+    public List<LoggingEvent> getLog() {
+      return new ArrayList<LoggingEvent>(log);
+    }
+  }
+
   public void testChooseTargetWithStaleNodes() throws Exception {
   public void testChooseTargetWithStaleNodes() throws Exception {
     // Enable avoiding writing to stale DataNodes
     // Enable avoiding writing to stale DataNodes
     namenode.getNamesystem().setAvoidStaleDataNodesForWrite(true);
     namenode.getNamesystem().setAvoidStaleDataNodesForWrite(true);