浏览代码

HDFS-9015. Refactor TestReplicationPolicy to test different block placement policies. (Ming Ma via lei)

Lei Xu 9 年之前
父节点
当前提交
a68b6eb0f4

+ 3 - 0
hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt

@@ -1010,6 +1010,9 @@ Release 2.8.0 - UNRELEASED
     'CredentialBasedAccessTokenProvider.getCredential()' abstract methods to
     public (Santhosh Nayak via cnauroth)
 
+    HDFS-9015. Refactor TestReplicationPolicy to test different block placement
+    policies. (Ming Ma via lei)
+
   OPTIMIZATIONS
 
     HDFS-8026. Trace FSOutputSummer#writeChecksumChunks rather than

+ 160 - 0
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/BaseReplicationPolicyTest.java

@@ -0,0 +1,160 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.blockmanagement;
+
+import java.io.File;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Set;
+
+import org.apache.commons.logging.impl.Log4JLogger;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.hdfs.DFSConfigKeys;
+import org.apache.hadoop.hdfs.DFSTestUtil;
+import org.apache.hadoop.hdfs.HdfsConfiguration;
+import org.apache.hadoop.hdfs.server.namenode.NameNode;
+import org.apache.hadoop.hdfs.server.common.HdfsServerConstants;
+import org.apache.hadoop.hdfs.TestBlockStoragePolicy;
+import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocols;
+import org.apache.hadoop.net.NetworkTopology;
+import org.apache.hadoop.net.Node;
+import org.apache.hadoop.test.GenericTestUtils;
+import org.apache.hadoop.test.PathUtils;
+import org.apache.log4j.Level;
+import org.junit.After;
+import org.junit.Before;
+
+abstract public class BaseReplicationPolicyTest {
+  {
+    GenericTestUtils.setLogLevel(BlockPlacementPolicy.LOG, Level.ALL);
+  }
+
+  protected NetworkTopology cluster;
+  protected DatanodeDescriptor dataNodes[];
+  protected static final int BLOCK_SIZE = 1024;
+  protected NameNode namenode;
+  protected DatanodeManager dnManager;
+  protected BlockPlacementPolicy replicator;
+  protected final String filename = "/dummyfile.txt";
+  protected DatanodeStorageInfo[] storages;
+  protected String blockPlacementPolicy;
+  protected NamenodeProtocols nameNodeRpc = null;
+
+  static void updateHeartbeatWithUsage(DatanodeDescriptor dn,
+    long capacity, long dfsUsed, long remaining, long blockPoolUsed,
+    long dnCacheCapacity, long dnCacheUsed, int xceiverCount,
+    int volFailures) {
+    dn.getStorageInfos()[0].setUtilizationForTesting(
+        capacity, dfsUsed, remaining, blockPoolUsed);
+    dn.updateHeartbeat(
+        BlockManagerTestUtil.getStorageReportsForDatanode(dn),
+        dnCacheCapacity, dnCacheUsed, xceiverCount, volFailures, null);
+  }
+
+  abstract DatanodeDescriptor[] getDatanodeDescriptors(Configuration conf);
+
+  @Before
+  public void setupCluster() throws Exception {
+    Configuration conf = new HdfsConfiguration();
+    dataNodes = getDatanodeDescriptors(conf);
+
+    FileSystem.setDefaultUri(conf, "hdfs://localhost:0");
+    conf.set(DFSConfigKeys.DFS_NAMENODE_HTTP_ADDRESS_KEY, "0.0.0.0:0");
+    File baseDir = PathUtils.getTestDir(TestReplicationPolicy.class);
+    conf.set(DFSConfigKeys.DFS_NAMENODE_NAME_DIR_KEY,
+        new File(baseDir, "name").getPath());
+    conf.set(DFSConfigKeys.DFS_BLOCK_REPLICATOR_CLASSNAME_KEY,
+        blockPlacementPolicy);
+    conf.setBoolean(
+        DFSConfigKeys.DFS_NAMENODE_AVOID_STALE_DATANODE_FOR_READ_KEY, true);
+    conf.setBoolean(
+        DFSConfigKeys.DFS_NAMENODE_AVOID_STALE_DATANODE_FOR_WRITE_KEY, true);
+    DFSTestUtil.formatNameNode(conf);
+    namenode = new NameNode(conf);
+    nameNodeRpc = namenode.getRpcServer();
+
+    final BlockManager bm = namenode.getNamesystem().getBlockManager();
+    replicator = bm.getBlockPlacementPolicy();
+    cluster = bm.getDatanodeManager().getNetworkTopology();
+    dnManager = bm.getDatanodeManager();
+    // construct network topology
+    for (int i=0; i < dataNodes.length; i++) {
+      cluster.add(dataNodes[i]);
+      //bm.getDatanodeManager().getHost2DatanodeMap().add(dataNodes[i]);
+      bm.getDatanodeManager().getHeartbeatManager().addDatanode(
+          dataNodes[i]);
+    }
+    updateHeartbeatWithUsage();
+  }
+
+  void updateHeartbeatWithUsage() {
+    for (int i=0; i < dataNodes.length; i++) {
+      updateHeartbeatWithUsage(dataNodes[i],
+          2* HdfsServerConstants.MIN_BLOCKS_FOR_WRITE*BLOCK_SIZE, 0L,
+          2* HdfsServerConstants.MIN_BLOCKS_FOR_WRITE*BLOCK_SIZE, 0L, 0L, 0L, 0, 0);
+    }
+  }
+
+  @After
+  public void tearDown() throws Exception {
+    namenode.stop();
+  }
+
+  boolean isOnSameRack(DatanodeStorageInfo left, DatanodeStorageInfo right) {
+    return isOnSameRack(left, right.getDatanodeDescriptor());
+  }
+
+  boolean isOnSameRack(DatanodeStorageInfo left, DatanodeDescriptor right) {
+    return cluster.isOnSameRack(left.getDatanodeDescriptor(), right);
+  }
+
+  DatanodeStorageInfo[] chooseTarget(int numOfReplicas) {
+    return chooseTarget(numOfReplicas, dataNodes[0]);
+  }
+
+  DatanodeStorageInfo[] chooseTarget(int numOfReplicas,
+      DatanodeDescriptor writer) {
+    return chooseTarget(numOfReplicas, writer,
+        new ArrayList<DatanodeStorageInfo>());
+  }
+
+  DatanodeStorageInfo[] chooseTarget(int numOfReplicas,
+      List<DatanodeStorageInfo> chosenNodes) {
+    return chooseTarget(numOfReplicas, dataNodes[0], chosenNodes);
+  }
+
+  DatanodeStorageInfo[] chooseTarget(int numOfReplicas,
+      DatanodeDescriptor writer, List<DatanodeStorageInfo> chosenNodes) {
+    return chooseTarget(numOfReplicas, writer, chosenNodes, null);
+  }
+
+  DatanodeStorageInfo[] chooseTarget(int numOfReplicas,
+      List<DatanodeStorageInfo> chosenNodes, Set<Node> excludedNodes) {
+    return chooseTarget(numOfReplicas, dataNodes[0], chosenNodes,
+        excludedNodes);
+  }
+
+  DatanodeStorageInfo[] chooseTarget(int numOfReplicas,
+     DatanodeDescriptor writer, List<DatanodeStorageInfo> chosenNodes,
+     Set<Node> excludedNodes) {
+    return replicator.chooseTarget(filename, numOfReplicas, writer,
+        chosenNodes, false, excludedNodes, BLOCK_SIZE,
+        TestBlockStoragePolicy.DEFAULT_STORAGE_POLICY);
+  }
+}

+ 32 - 116
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestReplicationPolicy.java

@@ -26,7 +26,6 @@ import static org.mockito.Matchers.any;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
 
-import java.io.File;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Arrays;
@@ -39,7 +38,6 @@ import java.util.concurrent.ThreadLocalRandom;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.ContentSummary;
-import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.StorageType;
 import org.apache.hadoop.hdfs.DFSConfigKeys;
 import org.apache.hadoop.hdfs.DFSTestUtil;
@@ -55,52 +53,40 @@ import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.BlockUCState;
 import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.ReplicaState;
 import org.apache.hadoop.hdfs.server.datanode.DataNode;
 import org.apache.hadoop.hdfs.server.datanode.DataNodeTestUtils;
-import org.apache.hadoop.hdfs.server.namenode.NameNode;
 import org.apache.hadoop.hdfs.server.namenode.Namesystem;
 import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage;
-import org.apache.hadoop.net.NetworkTopology;
 import org.apache.hadoop.net.Node;
-import org.apache.hadoop.test.GenericTestUtils;
-import org.apache.hadoop.test.PathUtils;
 import org.apache.log4j.Level;
 import org.apache.log4j.Logger;
 import org.apache.log4j.spi.LoggingEvent;
-import org.junit.BeforeClass;
 import org.junit.Rule;
 import org.junit.Test;
 import org.junit.rules.ExpectedException;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
 
-public class TestReplicationPolicy {
-  {
-    GenericTestUtils.setLogLevel(BlockPlacementPolicy.LOG, Level.ALL);
-  }
+@RunWith(Parameterized.class)
+public class TestReplicationPolicy extends BaseReplicationPolicyTest {
 
-  private static final int BLOCK_SIZE = 1024;
-  private static final int NUM_OF_DATANODES = 6;
-  private static NetworkTopology cluster;
-  private static NameNode namenode;
-  private static BlockPlacementPolicy replicator;
   private static final String filename = "/dummyfile.txt";
-  private static DatanodeDescriptor[] dataNodes;
-  private static DatanodeStorageInfo[] storages;
   // The interval for marking a datanode as stale,
   private static final long staleInterval =
       DFSConfigKeys.DFS_NAMENODE_STALE_DATANODE_INTERVAL_DEFAULT;
 
   @Rule
   public ExpectedException exception = ExpectedException.none();
-  
-  private static void updateHeartbeatWithUsage(DatanodeDescriptor dn,
-    long capacity, long dfsUsed, long remaining, long blockPoolUsed,
-    long dnCacheCapacity, long dnCacheUsed, int xceiverCount, int volFailures) {
-    dn.getStorageInfos()[0].setUtilizationForTesting(
-        capacity, dfsUsed, remaining, blockPoolUsed);
-    dn.updateHeartbeat(
-        BlockManagerTestUtil.getStorageReportsForDatanode(dn),
-        dnCacheCapacity, dnCacheUsed, xceiverCount, volFailures, null);
+
+  public TestReplicationPolicy(String blockPlacementPolicyClassName) {
+    this.blockPlacementPolicy = blockPlacementPolicyClassName;
   }
 
-  private static void updateHeartbeatForExtraStorage(long capacity,
+  @Parameterized.Parameters
+  public static Iterable<Object[]> data() {
+    return Arrays.asList(new Object[][] {
+        { BlockPlacementPolicyDefault.class.getName() } });
+  }
+
+  private void updateHeartbeatForExtraStorage(long capacity,
       long dfsUsed, long remaining, long blockPoolUsed) {
     DatanodeDescriptor dn = dataNodes[5];
     dn.getStorageInfos()[1].setUtilizationForTesting(
@@ -110,9 +96,19 @@ public class TestReplicationPolicy {
         0L, 0L, 0, 0, null);
   }
 
-  @BeforeClass
-  public static void setupCluster() throws Exception {
-    Configuration conf = new HdfsConfiguration();
+  private void resetHeartbeatForStorages() {
+    for (int i=0; i < dataNodes.length; i++) {
+      updateHeartbeatWithUsage(dataNodes[i],
+          2* HdfsServerConstants.MIN_BLOCKS_FOR_WRITE*BLOCK_SIZE, 0L,
+          2* HdfsServerConstants.MIN_BLOCKS_FOR_WRITE*BLOCK_SIZE, 0L, 0L, 0L,
+          0, 0);
+    }
+    // No available space in the extra storage of dn0
+    updateHeartbeatForExtraStorage(0L, 0L, 0L, 0L);
+  }
+
+  @Override
+  DatanodeDescriptor[] getDatanodeDescriptors(Configuration conf) {
     final String[] racks = {
         "/d1/r1",
         "/d1/r1",
@@ -121,59 +117,13 @@ public class TestReplicationPolicy {
         "/d2/r3",
         "/d2/r3"};
     storages = DFSTestUtil.createDatanodeStorageInfos(racks);
-    dataNodes = DFSTestUtil.toDatanodeDescriptor(storages);
-
     // create an extra storage for dn5.
     DatanodeStorage extraStorage = new DatanodeStorage(
         storages[5].getStorageID() + "-extra", DatanodeStorage.State.NORMAL,
         StorageType.DEFAULT);
-/*    DatanodeStorageInfo si = new DatanodeStorageInfo(
-        storages[5].getDatanodeDescriptor(), extraStorage);
-*/
     BlockManagerTestUtil.updateStorage(storages[5].getDatanodeDescriptor(),
         extraStorage);
-
-    FileSystem.setDefaultUri(conf, "hdfs://localhost:0");
-    conf.set(DFSConfigKeys.DFS_NAMENODE_HTTP_ADDRESS_KEY, "0.0.0.0:0");
-    File baseDir = PathUtils.getTestDir(TestReplicationPolicy.class);
-    conf.set(DFSConfigKeys.DFS_NAMENODE_NAME_DIR_KEY,
-        new File(baseDir, "name").getPath());
-
-    conf.setBoolean(
-        DFSConfigKeys.DFS_NAMENODE_AVOID_STALE_DATANODE_FOR_READ_KEY, true);
-    conf.setBoolean(
-        DFSConfigKeys.DFS_NAMENODE_AVOID_STALE_DATANODE_FOR_WRITE_KEY, true);
-    DFSTestUtil.formatNameNode(conf);
-    namenode = new NameNode(conf);
-
-    final BlockManager bm = namenode.getNamesystem().getBlockManager();
-    replicator = bm.getBlockPlacementPolicy();
-    cluster = bm.getDatanodeManager().getNetworkTopology();
-    // construct network topology
-    for (int i=0; i < NUM_OF_DATANODES; i++) {
-      cluster.add(dataNodes[i]);
-      bm.getDatanodeManager().getHeartbeatManager().addDatanode(
-          dataNodes[i]);
-    }
-    resetHeartbeatForStorages();
-  }
-
-  private static void resetHeartbeatForStorages() {
-    for (int i=0; i < NUM_OF_DATANODES; i++) {
-      updateHeartbeatWithUsage(dataNodes[i],
-          2* HdfsServerConstants.MIN_BLOCKS_FOR_WRITE*BLOCK_SIZE, 0L,
-          2* HdfsServerConstants.MIN_BLOCKS_FOR_WRITE*BLOCK_SIZE, 0L, 0L, 0L, 0, 0);
-    }
-    // No available space in the extra storage of dn0
-    updateHeartbeatForExtraStorage(0L, 0L, 0L, 0L);
-  }
-
-  private static boolean isOnSameRack(DatanodeStorageInfo left, DatanodeStorageInfo right) {
-    return isOnSameRack(left, right.getDatanodeDescriptor());
-  }
-
-  private static boolean isOnSameRack(DatanodeStorageInfo left, DatanodeDescriptor right) {
-    return cluster.isOnSameRack(left.getDatanodeDescriptor(), right);
+    return DFSTestUtil.toDatanodeDescriptor(storages);
   }
 
   /**
@@ -269,40 +219,6 @@ public class TestReplicationPolicy {
     resetHeartbeatForStorages();
   }
 
-  private static DatanodeStorageInfo[] chooseTarget(int numOfReplicas) {
-    return chooseTarget(numOfReplicas, dataNodes[0]);
-  }
-
-  private static DatanodeStorageInfo[] chooseTarget(int numOfReplicas,
-      DatanodeDescriptor writer) {
-    return chooseTarget(numOfReplicas, writer,
-        new ArrayList<DatanodeStorageInfo>());
-  }
-
-  private static DatanodeStorageInfo[] chooseTarget(int numOfReplicas,
-      List<DatanodeStorageInfo> chosenNodes) {
-    return chooseTarget(numOfReplicas, dataNodes[0], chosenNodes);
-  }
-
-  private static DatanodeStorageInfo[] chooseTarget(int numOfReplicas,
-      DatanodeDescriptor writer, List<DatanodeStorageInfo> chosenNodes) {
-    return chooseTarget(numOfReplicas, writer, chosenNodes, null);
-  }
-
-  private static DatanodeStorageInfo[] chooseTarget(int numOfReplicas,
-      List<DatanodeStorageInfo> chosenNodes, Set<Node> excludedNodes) {
-    return chooseTarget(numOfReplicas, dataNodes[0], chosenNodes, excludedNodes);
-  }
-
-  private static DatanodeStorageInfo[] chooseTarget(
-      int numOfReplicas,
-      DatanodeDescriptor writer,
-      List<DatanodeStorageInfo> chosenNodes,
-      Set<Node> excludedNodes) {
-    return replicator.chooseTarget(filename, numOfReplicas, writer, chosenNodes,
-        false, excludedNodes, BLOCK_SIZE, TestBlockStoragePolicy.DEFAULT_STORAGE_POLICY);
-  }
-
   /**
    * In this testcase, client is dataNodes[0], but the dataNodes[1] is
    * not allowed to be chosen. So the 1st replica should be
@@ -555,7 +471,7 @@ public class TestReplicationPolicy {
       throws Exception {
     try {
       namenode.getNamesystem().getBlockManager().getDatanodeManager()
-        .setNumStaleNodes(NUM_OF_DATANODES);
+        .setNumStaleNodes(dataNodes.length);
       testChooseTargetWithMoreThanAvailableNodes();
     } finally {
       namenode.getNamesystem().getBlockManager().getDatanodeManager()
@@ -583,8 +499,8 @@ public class TestReplicationPolicy {
     
     // try to choose NUM_OF_DATANODES which is more than actually available
     // nodes.
-    DatanodeStorageInfo[] targets = chooseTarget(NUM_OF_DATANODES);
-    assertEquals(targets.length, NUM_OF_DATANODES - 2);
+    DatanodeStorageInfo[] targets = chooseTarget(dataNodes.length);
+    assertEquals(targets.length, dataNodes.length - 2);
 
     final List<LoggingEvent> log = appender.getLog();
     assertNotNull(log);
@@ -1256,7 +1172,7 @@ public class TestReplicationPolicy {
     // Adding this block will increase its current replication, and that will
     // remove it from the queue.
     bm.addStoredBlockUnderConstruction(new StatefulBlockInfo(info, info,
-              ReplicaState.FINALIZED), TestReplicationPolicy.storages[0]);
+        ReplicaState.FINALIZED), storages[0]);
 
     // Choose 1 block from UnderReplicatedBlocks. Then it should pick 1 block
     // from QUEUE_VERY_UNDER_REPLICATED.

+ 34 - 87
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestReplicationPolicyConsiderLoad.java

@@ -20,85 +20,45 @@ package org.apache.hadoop.hdfs.server.blockmanagement;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
 
-import java.io.File;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.HashSet;
-import java.util.List;
 import java.util.Set;
 
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.hdfs.DFSConfigKeys;
 import org.apache.hadoop.hdfs.DFSTestUtil;
-import org.apache.hadoop.hdfs.HdfsConfiguration;
 import org.apache.hadoop.hdfs.TestBlockStoragePolicy;
-import org.apache.hadoop.hdfs.security.token.block.ExportedBlockKeys;
-import org.apache.hadoop.hdfs.server.common.HdfsServerConstants;
-import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.NodeType;
-import org.apache.hadoop.hdfs.server.common.StorageInfo;
-import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
 import org.apache.hadoop.hdfs.server.namenode.NameNode;
-import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
-import org.apache.hadoop.test.PathUtils;
-import org.apache.hadoop.util.VersionInfo;
-import org.junit.AfterClass;
-import org.junit.BeforeClass;
 import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
 
-public class TestReplicationPolicyConsiderLoad {
+@RunWith(Parameterized.class)
+public class TestReplicationPolicyConsiderLoad
+    extends BaseReplicationPolicyTest {
 
-  private static NameNode namenode;
-  private static DatanodeManager dnManager;
-  private static List<DatanodeRegistration> dnrList;
-  private static DatanodeDescriptor[] dataNodes;
-  private static DatanodeStorageInfo[] storages;
+  public TestReplicationPolicyConsiderLoad(String blockPlacementPolicy) {
+    this.blockPlacementPolicy = blockPlacementPolicy;
+  }
+
+  @Parameterized.Parameters
+  public static Iterable<Object[]> data() {
+    return Arrays.asList(new Object[][] {
+        { BlockPlacementPolicyDefault.class.getName() } });
+  }
 
-  @BeforeClass
-  public static void setupCluster() throws IOException {
-    Configuration conf = new HdfsConfiguration();
+  @Override
+  DatanodeDescriptor[] getDatanodeDescriptors(Configuration conf) {
     final String[] racks = {
-        "/rack1",
         "/rack1",
         "/rack1",
         "/rack2",
         "/rack2",
-        "/rack2"};
+        "/rack3",
+        "/rack3"};
     storages = DFSTestUtil.createDatanodeStorageInfos(racks);
-    dataNodes = DFSTestUtil.toDatanodeDescriptor(storages);
-    FileSystem.setDefaultUri(conf, "hdfs://localhost:0");
-    conf.set(DFSConfigKeys.DFS_NAMENODE_HTTP_ADDRESS_KEY, "0.0.0.0:0");
-    File baseDir = PathUtils.getTestDir(TestReplicationPolicy.class);
-    conf.set(DFSConfigKeys.DFS_NAMENODE_NAME_DIR_KEY,
-        new File(baseDir, "name").getPath());
-    conf.setBoolean(
-        DFSConfigKeys.DFS_NAMENODE_AVOID_STALE_DATANODE_FOR_READ_KEY, true);
-    conf.setBoolean(
-        DFSConfigKeys.DFS_NAMENODE_AVOID_STALE_DATANODE_FOR_WRITE_KEY, true);
-    conf.setBoolean(
-        DFSConfigKeys.DFS_NAMENODE_REPLICATION_CONSIDERLOAD_KEY, true);
-    DFSTestUtil.formatNameNode(conf);
-    namenode = new NameNode(conf);
-    int blockSize = 1024;
-
-    dnrList = new ArrayList<DatanodeRegistration>();
-    dnManager = namenode.getNamesystem().getBlockManager().getDatanodeManager();
-
-    // Register DNs
-    for (int i=0; i < 6; i++) {
-      DatanodeRegistration dnr = new DatanodeRegistration(dataNodes[i],
-          new StorageInfo(NodeType.DATA_NODE), new ExportedBlockKeys(),
-          VersionInfo.getVersion());
-      dnrList.add(dnr);
-      dnManager.registerDatanode(dnr);
-      dataNodes[i].getStorageInfos()[0].setUtilizationForTesting(
-          2* HdfsServerConstants.MIN_BLOCKS_FOR_WRITE*blockSize, 0L,
-          2* HdfsServerConstants.MIN_BLOCKS_FOR_WRITE*blockSize, 0L);
-      dataNodes[i].updateHeartbeat(
-          BlockManagerTestUtil.getStorageReportsForDatanode(dataNodes[i]),
-          0L, 0L, 0, 0, null);
-    }
+    return DFSTestUtil.toDatanodeDescriptor(storages);
   }
 
   private final double EPSILON = 0.0001;
@@ -110,46 +70,39 @@ public class TestReplicationPolicyConsiderLoad {
   public void testChooseTargetWithDecomNodes() throws IOException {
     namenode.getNamesystem().writeLock();
     try {
-      String blockPoolId = namenode.getNamesystem().getBlockPoolId();
-      dnManager.handleHeartbeat(dnrList.get(3),
+      dnManager.getHeartbeatManager().updateHeartbeat(dataNodes[3],
           BlockManagerTestUtil.getStorageReportsForDatanode(dataNodes[3]),
-          blockPoolId, dataNodes[3].getCacheCapacity(),
-          dataNodes[3].getCacheRemaining(),
-          2, 0, 0, null);
-      dnManager.handleHeartbeat(dnrList.get(4),
+          dataNodes[3].getCacheCapacity(),
+          dataNodes[3].getCacheUsed(),
+          2, 0, null);
+      dnManager.getHeartbeatManager().updateHeartbeat(dataNodes[4],
           BlockManagerTestUtil.getStorageReportsForDatanode(dataNodes[4]),
-          blockPoolId, dataNodes[4].getCacheCapacity(),
-          dataNodes[4].getCacheRemaining(),
-          4, 0, 0, null);
-      dnManager.handleHeartbeat(dnrList.get(5),
+          dataNodes[4].getCacheCapacity(),
+          dataNodes[4].getCacheUsed(),
+          4, 0, null);
+      dnManager.getHeartbeatManager().updateHeartbeat(dataNodes[5],
           BlockManagerTestUtil.getStorageReportsForDatanode(dataNodes[5]),
-          blockPoolId, dataNodes[5].getCacheCapacity(),
-          dataNodes[5].getCacheRemaining(),
-          4, 0, 0, null);
+          dataNodes[5].getCacheCapacity(),
+          dataNodes[5].getCacheUsed(),
+          4, 0, null);
+
       // value in the above heartbeats
       final int load = 2 + 4 + 4;
       
-      FSNamesystem fsn = namenode.getNamesystem();
       assertEquals((double)load/6, dnManager.getFSClusterStats()
         .getInServiceXceiverAverage(), EPSILON);
       
       // Decommission DNs so BlockPlacementPolicyDefault.isGoodTarget()
       // returns false
       for (int i = 0; i < 3; i++) {
-        DatanodeDescriptor d = dnManager.getDatanode(dnrList.get(i));
+        DatanodeDescriptor d = dataNodes[i];
         dnManager.getDecomManager().startDecommission(d);
         d.setDecommissioned();
       }
       assertEquals((double)load/3, dnManager.getFSClusterStats()
         .getInServiceXceiverAverage(), EPSILON);
 
-      // update references of writer DN to update the de-commissioned state
-      List<DatanodeDescriptor> liveNodes = new ArrayList<DatanodeDescriptor>();
-      dnManager.fetchDatanodes(liveNodes, null, false);
-      DatanodeDescriptor writerDn = null;
-      if (liveNodes.contains(dataNodes[0])) {
-        writerDn = liveNodes.get(liveNodes.indexOf(dataNodes[0]));
-      }
+      DatanodeDescriptor writerDn = dataNodes[0];
 
       // Call chooseTarget()
       DatanodeStorageInfo[] targets = namenode.getNamesystem().getBlockManager()
@@ -171,10 +124,4 @@ public class TestReplicationPolicyConsiderLoad {
     }
     NameNode.LOG.info("Done working on it");
   }
-
-  @AfterClass
-  public static void teardownCluster() {
-    if (namenode != null) namenode.stop();
-  }
-
 }

+ 28 - 133
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestReplicationPolicyWithNodeGroup.java

@@ -21,7 +21,6 @@ import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
 
-import java.io.File;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.HashMap;
@@ -32,38 +31,25 @@ import java.util.Set;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
-import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.StorageType;
-import org.apache.hadoop.hdfs.DFSConfigKeys;
 import org.apache.hadoop.hdfs.DFSTestUtil;
-import org.apache.hadoop.hdfs.HdfsConfiguration;
 import org.apache.hadoop.hdfs.TestBlockStoragePolicy;
 import org.apache.hadoop.hdfs.server.common.HdfsServerConstants;
-import org.apache.hadoop.hdfs.server.namenode.NameNode;
 import org.apache.hadoop.net.NetworkTopology;
 import org.apache.hadoop.net.NetworkTopologyWithNodeGroup;
 import org.apache.hadoop.net.Node;
-import org.apache.hadoop.test.PathUtils;
-import org.junit.After;
-import org.junit.Before;
 import org.junit.Test;
 
 
-public class TestReplicationPolicyWithNodeGroup {
-  private static final int BLOCK_SIZE = 1024;
-  private static final int NUM_OF_DATANODES = 8;
-  private static final int NUM_OF_DATANODES_BOUNDARY = 6;
-  private static final int NUM_OF_DATANODES_MORE_TARGETS = 12;
-  private static final int NUM_OF_DATANODES_FOR_DEPENDENCIES = 6;
-  private final Configuration CONF = new HdfsConfiguration();
-  private NetworkTopology cluster;
-  private NameNode namenode;
-  private BlockPlacementPolicy replicator;
-  private static final String filename = "/dummyfile.txt";
-
-  private static final DatanodeStorageInfo[] storages;
-  private static final DatanodeDescriptor[] dataNodes;
-  static {
+public class TestReplicationPolicyWithNodeGroup extends BaseReplicationPolicyTest {
+  public TestReplicationPolicyWithNodeGroup() {
+    this.blockPlacementPolicy = BlockPlacementPolicyWithNodeGroup.class.getName();
+  }
+
+  @Override
+  DatanodeDescriptor[] getDatanodeDescriptors(Configuration conf) {
+    conf.set(CommonConfigurationKeysPublic.NET_TOPOLOGY_IMPL_KEY,
+            NetworkTopologyWithNodeGroup.class.getName());
     final String[] racks = {
         "/d1/r1/n1",
         "/d1/r1/n1",
@@ -75,7 +61,7 @@ public class TestReplicationPolicyWithNodeGroup {
         "/d2/r3/n6"
     };
     storages = DFSTestUtil.createDatanodeStorageInfos(racks);
-    dataNodes = DFSTestUtil.toDatanodeDescriptor(storages);
+    return DFSTestUtil.toDatanodeDescriptor(storages);
   }
 
   private static final DatanodeStorageInfo[] storagesInBoundaryCase;
@@ -142,60 +128,7 @@ public class TestReplicationPolicyWithNodeGroup {
     dataNodesForDependencies = DFSTestUtil.toDatanodeDescriptor(storagesForDependencies);
     
   };
-  
-  @Before
-  public void setUp() throws Exception {
-    FileSystem.setDefaultUri(CONF, "hdfs://localhost:0");
-    CONF.set(DFSConfigKeys.DFS_NAMENODE_HTTP_ADDRESS_KEY, "0.0.0.0:0");
-    // Set properties to make HDFS aware of NodeGroup.
-    CONF.set(DFSConfigKeys.DFS_BLOCK_REPLICATOR_CLASSNAME_KEY, 
-        BlockPlacementPolicyWithNodeGroup.class.getName());
-    CONF.set(CommonConfigurationKeysPublic.NET_TOPOLOGY_IMPL_KEY, 
-        NetworkTopologyWithNodeGroup.class.getName());
-    
-    CONF.setBoolean(DFSConfigKeys.DFS_NAMENODE_AVOID_STALE_DATANODE_FOR_WRITE_KEY, true);
-    
-    File baseDir = PathUtils.getTestDir(TestReplicationPolicyWithNodeGroup.class);
-    
-    CONF.set(DFSConfigKeys.DFS_NAMENODE_NAME_DIR_KEY,
-        new File(baseDir, "name").getPath());
-    
-    DFSTestUtil.formatNameNode(CONF);
-    namenode = new NameNode(CONF);
-    final BlockManager bm = namenode.getNamesystem().getBlockManager();
-    replicator = bm.getBlockPlacementPolicy();
-    cluster = bm.getDatanodeManager().getNetworkTopology();
-    // construct network topology
-    for(int i=0; i<NUM_OF_DATANODES; i++) {
-      cluster.add(dataNodes[i]);
-    }
-    setupDataNodeCapacity();
-  }
-
-  @After
-  public void tearDown() throws Exception {
-    namenode.stop();
-  }
-  
-  private static void updateHeartbeatWithUsage(DatanodeDescriptor dn,
-      long capacity, long dfsUsed, long remaining, long blockPoolUsed,
-      long dnCacheCapacity, long dnCacheUsed, int xceiverCount,
-      int volFailures) {
-    dn.getStorageInfos()[0].setUtilizationForTesting(
-        capacity, dfsUsed, remaining, blockPoolUsed);
-    dn.updateHeartbeat(
-        BlockManagerTestUtil.getStorageReportsForDatanode(dn),
-        dnCacheCapacity, dnCacheUsed, xceiverCount, volFailures, null);
-  }
 
-  private static void setupDataNodeCapacity() {
-    for(int i=0; i<NUM_OF_DATANODES; i++) {
-      updateHeartbeatWithUsage(dataNodes[i],
-          2* HdfsServerConstants.MIN_BLOCKS_FOR_WRITE*BLOCK_SIZE, 0L,
-          2* HdfsServerConstants.MIN_BLOCKS_FOR_WRITE*BLOCK_SIZE, 0L, 0L, 0L, 0, 0);
-    }
-  }
-  
   /**
    * Scan the targets list: all targets should be on different NodeGroups.
    * Return false if two targets are found on the same NodeGroup.
@@ -217,10 +150,6 @@ public class TestReplicationPolicyWithNodeGroup {
     return true;
   }
 
-  private boolean isOnSameRack(DatanodeStorageInfo left, DatanodeStorageInfo right) {
-    return isOnSameRack(left.getDatanodeDescriptor(), right);
-  }
-
   private boolean isOnSameRack(DatanodeDescriptor left, DatanodeStorageInfo right) {
     return cluster.isOnSameRack(left, right.getDatanodeDescriptor());
   }
@@ -233,35 +162,6 @@ public class TestReplicationPolicyWithNodeGroup {
     return cluster.isOnSameNodeGroup(left, right.getDatanodeDescriptor());
   }
 
-  private DatanodeStorageInfo[] chooseTarget(int numOfReplicas) {
-    return chooseTarget(numOfReplicas, dataNodes[0]);
-  }
-
-  private DatanodeStorageInfo[] chooseTarget(int numOfReplicas,
-      DatanodeDescriptor writer) {
-    return chooseTarget(numOfReplicas, writer,
-        new ArrayList<DatanodeStorageInfo>());
-  }
-
-  private DatanodeStorageInfo[] chooseTarget(int numOfReplicas,
-      List<DatanodeStorageInfo> chosenNodes) {
-    return chooseTarget(numOfReplicas, dataNodes[0], chosenNodes);
-  }
-
-  private DatanodeStorageInfo[] chooseTarget(int numOfReplicas,
-      DatanodeDescriptor writer, List<DatanodeStorageInfo> chosenNodes) {
-    return chooseTarget(numOfReplicas, writer, chosenNodes, null);
-  }
-
-  private DatanodeStorageInfo[] chooseTarget(
-      int numOfReplicas,
-      DatanodeDescriptor writer,
-      List<DatanodeStorageInfo> chosenNodes,
-      Set<Node> excludedNodes) {
-    return replicator.chooseTarget(filename, numOfReplicas, writer, chosenNodes,
-        false, excludedNodes, BLOCK_SIZE, TestBlockStoragePolicy.DEFAULT_STORAGE_POLICY);
-  }
-
   /**
    * In this testcase, client is dataNodes[0]. So the 1st replica should be
    * placed on dataNodes[0], the 2nd replica should be placed on 
@@ -467,7 +367,7 @@ public class TestReplicationPolicyWithNodeGroup {
    */
   @Test
   public void testChooseTarget5() throws Exception {
-    setupDataNodeCapacity();
+    updateHeartbeatWithUsage();
     DatanodeStorageInfo[] targets;
     targets = chooseTarget(0, NODE);
     assertEquals(targets.length, 0);
@@ -514,7 +414,7 @@ public class TestReplicationPolicyWithNodeGroup {
    */
   @Test
   public void testRereplicate1() throws Exception {
-    setupDataNodeCapacity();
+    updateHeartbeatWithUsage();
     List<DatanodeStorageInfo> chosenNodes = new ArrayList<DatanodeStorageInfo>();
     chosenNodes.add(storages[0]);
     DatanodeStorageInfo[] targets;
@@ -547,7 +447,7 @@ public class TestReplicationPolicyWithNodeGroup {
    */
   @Test
   public void testRereplicate2() throws Exception {
-    setupDataNodeCapacity();
+    updateHeartbeatWithUsage();
     List<DatanodeStorageInfo> chosenNodes = new ArrayList<DatanodeStorageInfo>();
     chosenNodes.add(storages[0]);
     chosenNodes.add(storages[1]);
@@ -575,7 +475,7 @@ public class TestReplicationPolicyWithNodeGroup {
    */
   @Test
   public void testRereplicate3() throws Exception {
-    setupDataNodeCapacity();
+    updateHeartbeatWithUsage();
     List<DatanodeStorageInfo> chosenNodes = new ArrayList<DatanodeStorageInfo>();
     chosenNodes.add(storages[0]);
     chosenNodes.add(storages[3]);
@@ -671,19 +571,14 @@ public class TestReplicationPolicyWithNodeGroup {
    */
   @Test
   public void testChooseTargetsOnBoundaryTopology() throws Exception {
-    for(int i=0; i<NUM_OF_DATANODES; i++) {
+    for(int i=0; i<dataNodes.length; i++) {
       cluster.remove(dataNodes[i]);
     }
 
-    for(int i=0; i<NUM_OF_DATANODES_BOUNDARY; i++) {
+    for(int i=0; i<dataNodesInBoundaryCase.length; i++) {
       cluster.add(dataNodesInBoundaryCase[i]);
     }
-    for(int i=0; i<NUM_OF_DATANODES_BOUNDARY; i++) {
-      updateHeartbeatWithUsage(dataNodes[0],
-                2* HdfsServerConstants.MIN_BLOCKS_FOR_WRITE*BLOCK_SIZE, 0L,
-                (HdfsServerConstants.MIN_BLOCKS_FOR_WRITE-1)*BLOCK_SIZE,
-                0L, 0L, 0L, 0, 0);
-
+    for(int i=0; i<dataNodesInBoundaryCase.length; i++) {
       updateHeartbeatWithUsage(dataNodesInBoundaryCase[i],
           2* HdfsServerConstants.MIN_BLOCKS_FOR_WRITE*BLOCK_SIZE, 0L,
           2* HdfsServerConstants.MIN_BLOCKS_FOR_WRITE*BLOCK_SIZE, 0L, 0L, 0L, 0, 0);
@@ -714,7 +609,7 @@ public class TestReplicationPolicyWithNodeGroup {
    */
   @Test
   public void testRereplicateOnBoundaryTopology() throws Exception {
-    for(int i=0; i<NUM_OF_DATANODES_BOUNDARY; i++) {
+    for(int i=0; i<dataNodesInBoundaryCase.length; i++) {
       updateHeartbeatWithUsage(dataNodesInBoundaryCase[i],
           2* HdfsServerConstants.MIN_BLOCKS_FOR_WRITE*BLOCK_SIZE, 0L,
           2* HdfsServerConstants.MIN_BLOCKS_FOR_WRITE*BLOCK_SIZE, 0L, 0L, 0L, 0, 0);
@@ -738,21 +633,21 @@ public class TestReplicationPolicyWithNodeGroup {
    */
   @Test
   public void testChooseMoreTargetsThanNodeGroups() throws Exception {
-    for(int i=0; i<NUM_OF_DATANODES; i++) {
+    for(int i=0; i<dataNodes.length; i++) {
       cluster.remove(dataNodes[i]);
     }
-    for(int i=0; i<NUM_OF_DATANODES_BOUNDARY; i++) {
+    for(int i=0; i<dataNodesInBoundaryCase.length; i++) {
       DatanodeDescriptor node = dataNodesInBoundaryCase[i];
       if (cluster.contains(node)) {
         cluster.remove(node);
       }
     }
 
-    for(int i=0; i<NUM_OF_DATANODES_MORE_TARGETS; i++) {
+    for(int i=0; i<dataNodesInMoreTargetsCase.length; i++) {
       cluster.add(dataNodesInMoreTargetsCase[i]);
     }
 
-    for(int i=0; i<NUM_OF_DATANODES_MORE_TARGETS; i++) {
+    for(int i=0; i<dataNodesInMoreTargetsCase.length; i++) {
       updateHeartbeatWithUsage(dataNodesInMoreTargetsCase[i],
           2* HdfsServerConstants.MIN_BLOCKS_FOR_WRITE*BLOCK_SIZE, 0L,
           2* HdfsServerConstants.MIN_BLOCKS_FOR_WRITE*BLOCK_SIZE, 0L, 0L, 0L, 0, 0);
@@ -773,11 +668,11 @@ public class TestReplicationPolicyWithNodeGroup {
 
   @Test
   public void testChooseTargetWithDependencies() throws Exception {
-    for(int i=0; i<NUM_OF_DATANODES; i++) {
+    for(int i=0; i<dataNodes.length; i++) {
       cluster.remove(dataNodes[i]);
     }
     
-    for(int i=0; i<NUM_OF_DATANODES_MORE_TARGETS; i++) {
+    for(int i=0; i<dataNodesInMoreTargetsCase.length; i++) {
       DatanodeDescriptor node = dataNodesInMoreTargetsCase[i];
       if (cluster.contains(node)) {
         cluster.remove(node);
@@ -787,7 +682,7 @@ public class TestReplicationPolicyWithNodeGroup {
     Host2NodesMap host2DatanodeMap = namenode.getNamesystem()
         .getBlockManager()
         .getDatanodeManager().getHost2DatanodeMap();
-    for(int i=0; i<NUM_OF_DATANODES_FOR_DEPENDENCIES; i++) {
+    for(int i=0; i<dataNodesForDependencies.length; i++) {
       cluster.add(dataNodesForDependencies[i]);
       host2DatanodeMap.add(dataNodesForDependencies[i]);
     }
@@ -803,7 +698,7 @@ public class TestReplicationPolicyWithNodeGroup {
         dataNodesForDependencies[3].getHostName());
     
     //Update heartbeat
-    for(int i=0; i<NUM_OF_DATANODES_FOR_DEPENDENCIES; i++) {
+    for(int i=0; i<dataNodesForDependencies.length; i++) {
       updateHeartbeatWithUsage(dataNodesForDependencies[i],
           2* HdfsServerConstants.MIN_BLOCKS_FOR_WRITE*BLOCK_SIZE, 0L,
           2* HdfsServerConstants.MIN_BLOCKS_FOR_WRITE*BLOCK_SIZE, 0L, 0L, 0L, 0, 0);
@@ -825,8 +720,8 @@ public class TestReplicationPolicyWithNodeGroup {
     assertTrue(targets[1].equals(storagesForDependencies[3]) || targets[1].equals(storagesForDependencies[4]));
     
     //verify that all data nodes are in the excluded list
-    assertEquals(excludedNodes.size(), NUM_OF_DATANODES_FOR_DEPENDENCIES);
-    for(int i=0; i<NUM_OF_DATANODES_FOR_DEPENDENCIES; i++) {
+    assertEquals(excludedNodes.size(), dataNodesForDependencies.length);
+    for(int i=0; i<dataNodesForDependencies.length; i++) {
       assertTrue(excludedNodes.contains(dataNodesForDependencies[i]));
     }
   }