|
@@ -41,6 +41,7 @@ import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
|
|
|
import org.apache.hadoop.hdfs.protocol.HdfsConstants.DatanodeReportType;
|
|
|
import org.apache.hadoop.net.NetworkTopology;
|
|
|
import org.junit.Test;
|
|
|
+import junit.framework.Assert;
|
|
|
|
|
|
/**
|
|
|
* This class tests if a balancer schedules tasks correctly.
|
|
@@ -174,12 +175,25 @@ public class TestBalancerWithNodeGroup {
|
|
|
LOG.info("Rebalancing with default factor.");
|
|
|
waitForBalancer(totalUsedSpace, totalCapacity);
|
|
|
}
|
|
|
+
|
|
|
+ private void runBalancerCanFinish(Configuration conf,
|
|
|
+ long totalUsedSpace, long totalCapacity) throws Exception {
|
|
|
+ waitForHeartBeat(totalUsedSpace, totalCapacity);
|
|
|
+
|
|
|
+ // start rebalancing
|
|
|
+ Collection<URI> namenodes = DFSUtil.getNsServiceRpcUris(conf);
|
|
|
+ final int r = Balancer.run(namenodes, Balancer.Parameters.DEFALUT, conf);
|
|
|
+ Assert.assertTrue(r == Balancer.ReturnStatus.SUCCESS.code ||
|
|
|
+ (r == Balancer.ReturnStatus.NO_MOVE_PROGRESS.code));
|
|
|
+ waitForHeartBeat(totalUsedSpace, totalCapacity);
|
|
|
+ LOG.info("Rebalancing with default factor.");
|
|
|
+ }
|
|
|
|
|
|
/**
|
|
|
* Create a cluster with even distribution, and a new empty node is added to
|
|
|
* the cluster, then test rack locality for balancer policy.
|
|
|
*/
|
|
|
- @Test
|
|
|
+ @Test(timeout=60000)
|
|
|
public void testBalancerWithRackLocality() throws Exception {
|
|
|
Configuration conf = createConf();
|
|
|
long[] capacities = new long[]{CAPACITY, CAPACITY};
|
|
@@ -217,7 +231,7 @@ public class TestBalancerWithNodeGroup {
|
|
|
totalCapacity += newCapacity;
|
|
|
|
|
|
// run balancer and validate results
|
|
|
- runBalancer(conf, totalUsedSpace, totalCapacity);
|
|
|
+ runBalancerCanFinish(conf, totalUsedSpace, totalCapacity);
|
|
|
|
|
|
DatanodeInfo[] datanodeReport =
|
|
|
client.getDatanodeReport(DatanodeReportType.ALL);
|
|
@@ -245,7 +259,7 @@ public class TestBalancerWithNodeGroup {
|
|
|
* Create a cluster with even distribution, and a new empty node is added to
|
|
|
* the cluster, then test node-group locality for balancer policy.
|
|
|
*/
|
|
|
- @Test
|
|
|
+ @Test(timeout=60000)
|
|
|
public void testBalancerWithNodeGroup() throws Exception {
|
|
|
Configuration conf = createConf();
|
|
|
long[] capacities = new long[]{CAPACITY, CAPACITY, CAPACITY, CAPACITY};
|
|
@@ -289,4 +303,49 @@ public class TestBalancerWithNodeGroup {
|
|
|
cluster.shutdown();
|
|
|
}
|
|
|
}
|
|
|
+
|
|
|
+ /**
|
|
|
+ * Create a 4 nodes cluster: 2 nodes (n0, n1) in RACK0/NODEGROUP0, 1 node (n2)
|
|
|
+ * in RACK1/NODEGROUP1 and 1 node (n3) in RACK1/NODEGROUP2. Fill the cluster
|
|
|
+ * to 60% and 3 replicas, so n2 and n3 will have replica for all blocks according
|
|
|
+ * to replica placement policy with NodeGroup. As a result, n2 and n3 will be
|
|
|
+ * filled with 80% (60% x 4 / 3), and no blocks can be migrated from n2 and n3
|
|
|
+ * to n0 or n1 as balancer policy with node group. Thus, we expect the balancer
|
|
|
+ * to end in 5 iterations without move block process.
|
|
|
+ */
|
|
|
+ @Test(timeout=60000)
|
|
|
+ public void testBalancerEndInNoMoveProgress() throws Exception {
|
|
|
+ Configuration conf = createConf();
|
|
|
+ long[] capacities = new long[]{CAPACITY, CAPACITY, CAPACITY, CAPACITY};
|
|
|
+ String[] racks = new String[]{RACK0, RACK0, RACK1, RACK1};
|
|
|
+ String[] nodeGroups = new String[]{NODEGROUP0, NODEGROUP0, NODEGROUP1, NODEGROUP2};
|
|
|
+
|
|
|
+ int numOfDatanodes = capacities.length;
|
|
|
+ assertEquals(numOfDatanodes, racks.length);
|
|
|
+ assertEquals(numOfDatanodes, nodeGroups.length);
|
|
|
+ MiniDFSCluster.Builder builder = new MiniDFSCluster.Builder(conf)
|
|
|
+ .numDataNodes(capacities.length)
|
|
|
+ .racks(racks)
|
|
|
+ .simulatedCapacities(capacities);
|
|
|
+ MiniDFSClusterWithNodeGroup.setNodeGroups(nodeGroups);
|
|
|
+ cluster = new MiniDFSClusterWithNodeGroup(builder);
|
|
|
+ try {
|
|
|
+ cluster.waitActive();
|
|
|
+ client = NameNodeProxies.createProxy(conf,
|
|
|
+ cluster.getFileSystem(0).getUri(),
|
|
|
+ ClientProtocol.class).getProxy();
|
|
|
+
|
|
|
+ long totalCapacity = TestBalancer.sum(capacities);
|
|
|
+ // fill up the cluster to be 60% full
|
|
|
+ long totalUsedSpace = totalCapacity * 6 / 10;
|
|
|
+ TestBalancer.createFile(cluster, filePath, totalUsedSpace / 3,
|
|
|
+ (short) (3), 0);
|
|
|
+
|
|
|
+ // run balancer which can finish in 5 iterations with no block movement.
|
|
|
+ runBalancerCanFinish(conf, totalUsedSpace, totalCapacity);
|
|
|
+
|
|
|
+ } finally {
|
|
|
+ cluster.shutdown();
|
|
|
+ }
|
|
|
+ }
|
|
|
}
|