Browse Source

HDFS-5837. dfs.namenode.replication.considerLoad should consider decommissioned nodes. Contributed by Tao Luo.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-2.3@1566415 13f79535-47bb-0310-9956-ffa450edef68
Konstantin Shvachko 11 years ago
parent
commit
4beba0b847

+ 3 - 0
hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt

@@ -538,6 +538,9 @@ Release 2.3.0 - UNRELEASED
     HDFS-5873. dfs.http.policy should have higher precedence over dfs.https.enable.
     HDFS-5873. dfs.http.policy should have higher precedence over dfs.https.enable.
     (Haohui Mai via jing9)
     (Haohui Mai via jing9)
 
 
+    HDFS-5837. dfs.namenode.replication.considerLoad should consider
+    decommissioned nodes. (Tao Luo via shv)
+
   BREAKDOWN OF HDFS-2832 SUBTASKS AND RELATED JIRAS
   BREAKDOWN OF HDFS-2832 SUBTASKS AND RELATED JIRAS
 
 
     HDFS-4985. Add storage type to the protocol and expose it in block report
     HDFS-4985. Add storage type to the protocol and expose it in block report

+ 5 - 3
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicyDefault.java

@@ -633,9 +633,11 @@ public class BlockPlacementPolicyDefault extends BlockPlacementPolicy {
     // check the communication traffic of the target machine
     // check the communication traffic of the target machine
     if (considerLoad) {
     if (considerLoad) {
       double avgLoad = 0;
       double avgLoad = 0;
-      int size = clusterMap.getNumOfLeaves();
-      if (size != 0 && stats != null) {
-        avgLoad = (double)stats.getTotalLoad()/size;
+      if (stats != null) {
+        int size = stats.getNumDatanodesInService();
+        if (size != 0) {
+          avgLoad = (double)stats.getTotalLoad()/size;
+        }
       }
       }
       if (node.getXceiverCount() > (2.0 * avgLoad)) {
       if (node.getXceiverCount() > (2.0 * avgLoad)) {
         logNodeIsNotChosen(storage, "the node is too busy ");
         logNodeIsNotChosen(storage, "the node is too busy ");

+ 6 - 0
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSClusterStats.java

@@ -42,6 +42,12 @@ public interface FSClusterStats {
    *         for writing targets, and false otherwise.
    *         for writing targets, and false otherwise.
    */
    */
   public boolean isAvoidingStaleDataNodesForWrite();
   public boolean isAvoidingStaleDataNodesForWrite();
+
+  /**
+   * Indicates number of datanodes that are in service.
+   * @return Number of datanodes that are both alive and not decommissioned.
+   */
+  public int getNumDatanodesInService();
 }
 }
     
     
     
     

+ 6 - 1
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java

@@ -6801,7 +6801,12 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
     return this.blockManager.getDatanodeManager()
     return this.blockManager.getDatanodeManager()
         .shouldAvoidStaleDataNodesForWrite();
         .shouldAvoidStaleDataNodesForWrite();
   }
   }
-  
+
+  @Override // FSClusterStats
+  public int getNumDatanodesInService() {
+    return getNumLiveDataNodes() - getNumDecomLiveDataNodes();
+  }
+
   public SnapshotManager getSnapshotManager() {
   public SnapshotManager getSnapshotManager() {
     return snapshotManager;
     return snapshotManager;
   }
   }

+ 160 - 0
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestReplicationPolicyConsiderLoad.java

@@ -0,0 +1,160 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.blockmanagement;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.hdfs.DFSConfigKeys;
+import org.apache.hadoop.hdfs.DFSTestUtil;
+import org.apache.hadoop.hdfs.HdfsConfiguration;
+import org.apache.hadoop.hdfs.StorageType;
+import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
+import org.apache.hadoop.hdfs.protocol.HdfsConstants;
+import org.apache.hadoop.hdfs.security.token.block.ExportedBlockKeys;
+import org.apache.hadoop.hdfs.server.common.StorageInfo;
+import org.apache.hadoop.hdfs.server.namenode.NameNode;
+import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
+import org.apache.hadoop.test.PathUtils;
+import org.apache.hadoop.util.VersionInfo;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+public class TestReplicationPolicyConsiderLoad {
+
+  private static NameNode namenode;
+  private static DatanodeManager dnManager;
+  private static List<DatanodeRegistration> dnrList;
+  private static DatanodeDescriptor[] dataNodes;
+  private static DatanodeStorageInfo[] storages;
+
+  @BeforeClass
+  public static void setupCluster() throws IOException {
+    Configuration conf = new HdfsConfiguration();
+    final String[] racks = {
+        "/rack1",
+        "/rack1",
+        "/rack1",
+        "/rack2",
+        "/rack2",
+        "/rack2"};
+    storages = DFSTestUtil.createDatanodeStorageInfos(racks);
+    dataNodes = DFSTestUtil.toDatanodeDescriptor(storages);
+    FileSystem.setDefaultUri(conf, "hdfs://localhost:0");
+    conf.set(DFSConfigKeys.DFS_NAMENODE_HTTP_ADDRESS_KEY, "0.0.0.0:0");
+    File baseDir = PathUtils.getTestDir(TestReplicationPolicy.class);
+    conf.set(DFSConfigKeys.DFS_NAMENODE_NAME_DIR_KEY,
+        new File(baseDir, "name").getPath());
+    conf.setBoolean(
+        DFSConfigKeys.DFS_NAMENODE_AVOID_STALE_DATANODE_FOR_READ_KEY, true);
+    conf.setBoolean(
+        DFSConfigKeys.DFS_NAMENODE_AVOID_STALE_DATANODE_FOR_WRITE_KEY, true);
+    conf.setBoolean(
+        DFSConfigKeys.DFS_NAMENODE_REPLICATION_CONSIDERLOAD_KEY, true);
+    DFSTestUtil.formatNameNode(conf);
+    namenode = new NameNode(conf);
+    int blockSize = 1024;
+
+    dnrList = new ArrayList<DatanodeRegistration>();
+    dnManager = namenode.getNamesystem().getBlockManager().getDatanodeManager();
+
+    // Register DNs
+    for (int i=0; i < 6; i++) {
+      DatanodeRegistration dnr = new DatanodeRegistration(dataNodes[i],
+          new StorageInfo(), new ExportedBlockKeys(), VersionInfo.getVersion());
+      dnrList.add(dnr);
+      dnManager.registerDatanode(dnr);
+      dataNodes[i].getStorageInfos()[0].setUtilizationForTesting(
+          2*HdfsConstants.MIN_BLOCKS_FOR_WRITE*blockSize, 0L,
+          2*HdfsConstants.MIN_BLOCKS_FOR_WRITE*blockSize, 0L);
+      dataNodes[i].updateHeartbeat(
+          BlockManagerTestUtil.getStorageReportsForDatanode(dataNodes[i]),
+          0L, 0L, 0, 0);
+    }
+  }
+
+  /**
+   * Tests that chooseTarget with considerLoad set to true correctly calculates
+   * load with decommissioned nodes.
+   */
+  @Test
+  public void testChooseTargetWithDecomNodes() throws IOException {
+    namenode.getNamesystem().writeLock();
+    try {
+      // Decommission DNs so BlockPlacementPolicyDefault.isGoodTarget()
+      // returns false
+      for (int i = 0; i < 3; i++) {
+        DatanodeInfo d = dnManager.getDatanodeByXferAddr(
+            dnrList.get(i).getIpAddr(),
+            dnrList.get(i).getXferPort());
+        d.setDecommissioned();
+      }
+      String blockPoolId = namenode.getNamesystem().getBlockPoolId();
+      dnManager.handleHeartbeat(dnrList.get(3),
+          BlockManagerTestUtil.getStorageReportsForDatanode(dataNodes[3]),
+          blockPoolId, dataNodes[3].getCacheCapacity(),
+          dataNodes[3].getCacheRemaining(),
+          2, 0, 0);
+      dnManager.handleHeartbeat(dnrList.get(4),
+          BlockManagerTestUtil.getStorageReportsForDatanode(dataNodes[4]),
+          blockPoolId, dataNodes[4].getCacheCapacity(),
+          dataNodes[4].getCacheRemaining(),
+          4, 0, 0);
+      dnManager.handleHeartbeat(dnrList.get(5),
+          BlockManagerTestUtil.getStorageReportsForDatanode(dataNodes[5]),
+          blockPoolId, dataNodes[5].getCacheCapacity(),
+          dataNodes[5].getCacheRemaining(),
+          4, 0, 0);
+
+      // Call chooseTarget()
+      DatanodeStorageInfo[] targets = namenode.getNamesystem().getBlockManager()
+          .getBlockPlacementPolicy().chooseTarget("testFile.txt", 3,
+              dataNodes[0], new ArrayList<DatanodeStorageInfo>(), false, null,
+              1024, StorageType.DEFAULT);
+
+      assertEquals(3, targets.length);
+      Set<DatanodeStorageInfo> targetSet = new HashSet<DatanodeStorageInfo>(
+          Arrays.asList(targets));
+      for (int i = 3; i < storages.length; i++) {
+        assertTrue(targetSet.contains(storages[i]));
+      }
+    } finally {
+      dataNodes[0].stopDecommission();
+      dataNodes[1].stopDecommission();
+      dataNodes[2].stopDecommission();
+      namenode.getNamesystem().writeUnlock();
+    }
+  }
+
+  @AfterClass
+  public static void teardownCluster() {
+    if (namenode != null) namenode.stop();
+  }
+
+}