|
@@ -0,0 +1,215 @@
|
|
|
+/**
|
|
|
+ * Licensed to the Apache Software Foundation (ASF) under one
|
|
|
+ * or more contributor license agreements. See the NOTICE file
|
|
|
+ * distributed with this work for additional information
|
|
|
+ * regarding copyright ownership. The ASF licenses this file
|
|
|
+ * to you under the Apache License, Version 2.0 (the
|
|
|
+ * "License"); you may not use this file except in compliance
|
|
|
+ * with the License. You may obtain a copy of the License at
|
|
|
+ * <p>
|
|
|
+ * http://www.apache.org/licenses/LICENSE-2.0
|
|
|
+ * <p>
|
|
|
+ * Unless required by applicable law or agreed to in writing, software
|
|
|
+ * distributed under the License is distributed on an "AS IS" BASIS,
|
|
|
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
|
+ * See the License for the specific language governing permissions and
|
|
|
+ * limitations under the License.
|
|
|
+ */
|
|
|
+
|
|
|
+package org.apache.hadoop.hdfs.server.blockmanagement;
|
|
|
+
|
|
|
+import org.apache.hadoop.conf.Configuration;
|
|
|
+import org.apache.hadoop.fs.FileSystem;
|
|
|
+import org.apache.hadoop.hdfs.DFSConfigKeys;
|
|
|
+import org.apache.hadoop.hdfs.DFSTestUtil;
|
|
|
+import org.apache.hadoop.hdfs.HdfsConfiguration;
|
|
|
+import org.apache.hadoop.hdfs.TestBlockStoragePolicy;
|
|
|
+import org.apache.hadoop.hdfs.server.common.HdfsServerConstants;
|
|
|
+import org.apache.hadoop.hdfs.server.namenode.NameNode;
|
|
|
+import org.apache.hadoop.net.NetworkTopology;
|
|
|
+import org.apache.hadoop.net.Node;
|
|
|
+import org.apache.hadoop.test.PathUtils;
|
|
|
+import org.junit.AfterClass;
|
|
|
+import org.junit.Assert;
|
|
|
+import org.junit.BeforeClass;
|
|
|
+import org.junit.Test;
|
|
|
+
|
|
|
+import java.io.File;
|
|
|
+import java.util.ArrayList;
|
|
|
+import java.util.Collection;
|
|
|
+import java.util.Collections;
|
|
|
+import java.util.HashSet;
|
|
|
+
|
|
|
+import static org.junit.Assert.assertEquals;
|
|
|
+
|
|
|
+/**
|
|
|
+ * Tests AvailableSpaceRackFaultTolerant block placement policy.
|
|
|
+ */
|
|
|
+public class TestAvailableSpaceRackFaultTolerantBPP {
|
|
|
+ private final static int NUM_RACKS = 4;
|
|
|
+ private final static int NODES_PER_RACK = 5;
|
|
|
+ private final static int BLOCK_SIZE = 1024;
|
|
|
+ private final static int CHOOSE_TIMES = 10000;
|
|
|
+ private final static String FILE = "/tobers/test";
|
|
|
+ private final static int REPLICA = 3;
|
|
|
+
|
|
|
+ private static DatanodeStorageInfo[] storages;
|
|
|
+ private static DatanodeDescriptor[] dataNodes;
|
|
|
+ private static Configuration conf;
|
|
|
+ private static NameNode namenode;
|
|
|
+ private static BlockPlacementPolicy placementPolicy;
|
|
|
+ private static NetworkTopology cluster;
|
|
|
+
|
|
|
+ @BeforeClass
|
|
|
+ public static void setupCluster() throws Exception {
|
|
|
+ conf = new HdfsConfiguration();
|
|
|
+ conf.setFloat(
|
|
|
+ DFSConfigKeys.DFS_NAMENODE_AVAILABLE_SPACE_BLOCK_PLACEMENT_POLICY_BALANCED_SPACE_PREFERENCE_FRACTION_KEY,
|
|
|
+ 0.6f);
|
|
|
+ String[] racks = new String[NUM_RACKS];
|
|
|
+ for (int i = 0; i < NUM_RACKS; i++) {
|
|
|
+ racks[i] = "/rack" + i;
|
|
|
+ }
|
|
|
+
|
|
|
+ String[] owerRackOfNodes = new String[NUM_RACKS * NODES_PER_RACK];
|
|
|
+ for (int i = 0; i < NODES_PER_RACK; i++) {
|
|
|
+ for (int j = 0; j < NUM_RACKS; j++) {
|
|
|
+ owerRackOfNodes[i * NUM_RACKS + j] = racks[j];
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ storages = DFSTestUtil.createDatanodeStorageInfos(owerRackOfNodes);
|
|
|
+ dataNodes = DFSTestUtil.toDatanodeDescriptor(storages);
|
|
|
+
|
|
|
+ FileSystem.setDefaultUri(conf, "hdfs://localhost:0");
|
|
|
+ conf.set(DFSConfigKeys.DFS_NAMENODE_HTTP_ADDRESS_KEY, "0.0.0.0:0");
|
|
|
+ File baseDir = PathUtils
|
|
|
+ .getTestDir(AvailableSpaceRackFaultTolerantBlockPlacementPolicy.class);
|
|
|
+ conf.set(DFSConfigKeys.DFS_NAMENODE_NAME_DIR_KEY,
|
|
|
+ new File(baseDir, "name").getPath());
|
|
|
+ conf.set(DFSConfigKeys.DFS_BLOCK_REPLICATOR_CLASSNAME_KEY,
|
|
|
+ AvailableSpaceRackFaultTolerantBlockPlacementPolicy.class.getName());
|
|
|
+
|
|
|
+ DFSTestUtil.formatNameNode(conf);
|
|
|
+ namenode = new NameNode(conf);
|
|
|
+
|
|
|
+ final BlockManager bm = namenode.getNamesystem().getBlockManager();
|
|
|
+ placementPolicy = bm.getBlockPlacementPolicy();
|
|
|
+ cluster = bm.getDatanodeManager().getNetworkTopology();
|
|
|
+ for (int i = 0; i < NODES_PER_RACK * NUM_RACKS; i++) {
|
|
|
+ cluster.add(dataNodes[i]);
|
|
|
+ }
|
|
|
+
|
|
|
+ setupDataNodeCapacity();
|
|
|
+ }
|
|
|
+
|
|
|
+ private static void updateHeartbeatWithUsage(DatanodeDescriptor dn,
|
|
|
+ long capacity, long dfsUsed, long remaining, long blockPoolUsed,
|
|
|
+ long dnCacheCapacity, long dnCacheUsed, int xceiverCount,
|
|
|
+ int volFailures) {
|
|
|
+ dn.getStorageInfos()[0]
|
|
|
+ .setUtilizationForTesting(capacity, dfsUsed, remaining, blockPoolUsed);
|
|
|
+ dn.updateHeartbeat(BlockManagerTestUtil.getStorageReportsForDatanode(dn),
|
|
|
+ dnCacheCapacity, dnCacheUsed, xceiverCount, volFailures, null);
|
|
|
+ }
|
|
|
+
|
|
|
+ private static void setupDataNodeCapacity() {
|
|
|
+ for (int i = 0; i < NODES_PER_RACK * NUM_RACKS; i++) {
|
|
|
+ if ((i % 2) == 0) {
|
|
|
+ // remaining 100%
|
|
|
+ updateHeartbeatWithUsage(dataNodes[i],
|
|
|
+ 2 * HdfsServerConstants.MIN_BLOCKS_FOR_WRITE * BLOCK_SIZE, 0L,
|
|
|
+ 2 * HdfsServerConstants.MIN_BLOCKS_FOR_WRITE * BLOCK_SIZE, 0L, 0L,
|
|
|
+ 0L, 0, 0);
|
|
|
+ } else {
|
|
|
+ // remaining 50%
|
|
|
+ updateHeartbeatWithUsage(dataNodes[i],
|
|
|
+ 2 * HdfsServerConstants.MIN_BLOCKS_FOR_WRITE * BLOCK_SIZE,
|
|
|
+ HdfsServerConstants.MIN_BLOCKS_FOR_WRITE * BLOCK_SIZE,
|
|
|
+ HdfsServerConstants.MIN_BLOCKS_FOR_WRITE * BLOCK_SIZE, 0L, 0L, 0L,
|
|
|
+ 0, 0);
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ /*
|
|
|
+ * To verify that the BlockPlacementPolicy can be replaced by
|
|
|
+ * AvailableSpaceRackFaultTolerantBlockPlacementPolicy via
|
|
|
+ * changing the configuration.
|
|
|
+ */
|
|
|
+ @Test
|
|
|
+ public void testPolicyReplacement() {
|
|
|
+ Assert.assertTrue(
|
|
|
+ (placementPolicy instanceof
|
|
|
+ AvailableSpaceRackFaultTolerantBlockPlacementPolicy));
|
|
|
+ }
|
|
|
+
|
|
|
+ /*
|
|
|
+ * Call choose target many times and verify that nodes with more remaining
|
|
|
+ * percent will be chosen with high possibility.
|
|
|
+ */
|
|
|
+ @Test
|
|
|
+ public void testChooseTarget() {
|
|
|
+ int total = 0;
|
|
|
+ int moreRemainingNode = 0;
|
|
|
+ for (int i = 0; i < CHOOSE_TIMES; i++) {
|
|
|
+ DatanodeStorageInfo[] targets =
|
|
|
+ namenode.getNamesystem().getBlockManager().getBlockPlacementPolicy()
|
|
|
+ .chooseTarget(FILE, REPLICA, null,
|
|
|
+ new ArrayList<DatanodeStorageInfo>(), false, null, BLOCK_SIZE,
|
|
|
+ TestBlockStoragePolicy.DEFAULT_STORAGE_POLICY, null);
|
|
|
+
|
|
|
+ Assert.assertTrue(targets.length == REPLICA);
|
|
|
+ for (int j = 0; j < REPLICA; j++) {
|
|
|
+ total++;
|
|
|
+ if (targets[j].getDatanodeDescriptor().getRemainingPercent() > 60) {
|
|
|
+ moreRemainingNode++;
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+ Assert.assertTrue(total == REPLICA * CHOOSE_TIMES);
|
|
|
+ double possibility = 1.0 * moreRemainingNode / total;
|
|
|
+ Assert.assertTrue(possibility > 0.52);
|
|
|
+ Assert.assertTrue(possibility < 0.55);
|
|
|
+ }
|
|
|
+
|
|
|
+ @Test
|
|
|
+ public void testChooseDataNode() {
|
|
|
+ try {
|
|
|
+ Collection<Node> allNodes = new ArrayList<>(dataNodes.length);
|
|
|
+ Collections.addAll(allNodes, dataNodes);
|
|
|
+ if (placementPolicy instanceof AvailableSpaceBlockPlacementPolicy) {
|
|
|
+ // exclude all datanodes when chooseDataNode, no NPE should be thrown
|
|
|
+ ((AvailableSpaceRackFaultTolerantBlockPlacementPolicy) placementPolicy)
|
|
|
+ .chooseDataNode("~", allNodes);
|
|
|
+ }
|
|
|
+ } catch (NullPointerException npe) {
|
|
|
+ Assert.fail("NPE should not be thrown");
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * Test if the nodes are all spread across all racks.
|
|
|
+ */
|
|
|
+ @Test
|
|
|
+ public void testMaxRackAllocation() {
|
|
|
+ DatanodeStorageInfo[] targets =
|
|
|
+ namenode.getNamesystem().getBlockManager().getBlockPlacementPolicy()
|
|
|
+ .chooseTarget(FILE, REPLICA, null,
|
|
|
+ new ArrayList<DatanodeStorageInfo>(), false, null, BLOCK_SIZE,
|
|
|
+ TestBlockStoragePolicy.DEFAULT_STORAGE_POLICY, null);
|
|
|
+ HashSet<String> racks = new HashSet<String>();
|
|
|
+ for (int i = 0; i < targets.length; i++) {
|
|
|
+ racks.add(targets[i].getDatanodeDescriptor().getNetworkLocation());
|
|
|
+
|
|
|
+ }
|
|
|
+ assertEquals(REPLICA, racks.size());
|
|
|
+ }
|
|
|
+
|
|
|
+ @AfterClass
|
|
|
+ public static void teardownCluster() {
|
|
|
+ if (namenode != null) {
|
|
|
+ namenode.stop();
|
|
|
+ }
|
|
|
+ }
|
|
|
+}
|