|
@@ -18,6 +18,8 @@
|
|
package org.apache.hadoop.hdfs.server.balancer;
|
|
package org.apache.hadoop.hdfs.server.balancer;
|
|
|
|
|
|
import static org.junit.Assert.assertEquals;
|
|
import static org.junit.Assert.assertEquals;
|
|
|
|
+import static org.junit.Assert.assertTrue;
|
|
|
|
+import static org.junit.Assert.assertFalse;
|
|
|
|
|
|
import java.io.IOException;
|
|
import java.io.IOException;
|
|
import java.net.URI;
|
|
import java.net.URI;
|
|
@@ -44,11 +46,12 @@ import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
|
|
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
|
|
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
|
|
import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
|
|
import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
|
|
import org.apache.hadoop.hdfs.protocol.HdfsConstants.DatanodeReportType;
|
|
import org.apache.hadoop.hdfs.protocol.HdfsConstants.DatanodeReportType;
|
|
|
|
+import org.apache.hadoop.hdfs.server.blockmanagement.BlockPlacementPolicy;
|
|
import org.apache.hadoop.hdfs.server.blockmanagement.BlockPlacementPolicyWithNodeGroup;
|
|
import org.apache.hadoop.hdfs.server.blockmanagement.BlockPlacementPolicyWithNodeGroup;
|
|
|
|
+import org.apache.hadoop.hdfs.server.blockmanagement.BlockPlacementStatus;
|
|
import org.apache.hadoop.net.NetworkTopology;
|
|
import org.apache.hadoop.net.NetworkTopology;
|
|
import org.apache.hadoop.net.NetworkTopologyWithNodeGroup;
|
|
import org.apache.hadoop.net.NetworkTopologyWithNodeGroup;
|
|
import org.apache.hadoop.test.LambdaTestUtils;
|
|
import org.apache.hadoop.test.LambdaTestUtils;
|
|
-import org.junit.Assert;
|
|
|
|
import org.junit.Test;
|
|
import org.junit.Test;
|
|
|
|
|
|
/**
|
|
/**
|
|
@@ -84,7 +87,7 @@ public class TestBalancerWithNodeGroup {
|
|
TestBalancer.initConf(conf);
|
|
TestBalancer.initConf(conf);
|
|
conf.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, DEFAULT_BLOCK_SIZE);
|
|
conf.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, DEFAULT_BLOCK_SIZE);
|
|
conf.setBoolean(DFSConfigKeys.DFS_USE_DFS_NETWORK_TOPOLOGY_KEY, false);
|
|
conf.setBoolean(DFSConfigKeys.DFS_USE_DFS_NETWORK_TOPOLOGY_KEY, false);
|
|
- conf.set(CommonConfigurationKeysPublic.NET_TOPOLOGY_IMPL_KEY,
|
|
|
|
|
|
+ conf.set(CommonConfigurationKeysPublic.NET_TOPOLOGY_IMPL_KEY,
|
|
NetworkTopologyWithNodeGroup.class.getName());
|
|
NetworkTopologyWithNodeGroup.class.getName());
|
|
conf.set(DFSConfigKeys.DFS_BLOCK_REPLICATOR_CLASSNAME_KEY,
|
|
conf.set(DFSConfigKeys.DFS_BLOCK_REPLICATOR_CLASSNAME_KEY,
|
|
BlockPlacementPolicyWithNodeGroup.class.getName());
|
|
BlockPlacementPolicyWithNodeGroup.class.getName());
|
|
@@ -192,8 +195,8 @@ public class TestBalancerWithNodeGroup {
|
|
// start rebalancing
|
|
// start rebalancing
|
|
Collection<URI> namenodes = DFSUtil.getInternalNsRpcUris(conf);
|
|
Collection<URI> namenodes = DFSUtil.getInternalNsRpcUris(conf);
|
|
final int r = Balancer.run(namenodes, BalancerParameters.DEFAULT, conf);
|
|
final int r = Balancer.run(namenodes, BalancerParameters.DEFAULT, conf);
|
|
- Assert.assertTrue(r == ExitStatus.SUCCESS.getExitCode() ||
|
|
|
|
- (r == ExitStatus.NO_MOVE_PROGRESS.getExitCode()));
|
|
|
|
|
|
+ assertEquals("Balancer did not exit with NO_MOVE_PROGRESS",
|
|
|
|
+ ExitStatus.NO_MOVE_PROGRESS.getExitCode(), r);
|
|
waitForHeartBeat(totalUsedSpace, totalCapacity);
|
|
waitForHeartBeat(totalUsedSpace, totalCapacity);
|
|
LOG.info("Rebalancing with default factor.");
|
|
LOG.info("Rebalancing with default factor.");
|
|
}
|
|
}
|
|
@@ -211,6 +214,30 @@ public class TestBalancerWithNodeGroup {
|
|
return ret;
|
|
return ret;
|
|
}
|
|
}
|
|
|
|
|
|
|
|
+ private void verifyNetworkTopology() {
|
|
|
|
+ NetworkTopology topology =
|
|
|
|
+ cluster.getNamesystem().getBlockManager().getDatanodeManager().
|
|
|
|
+ getNetworkTopology();
|
|
|
|
+ assertTrue("must be an instance of NetworkTopologyWithNodeGroup",
|
|
|
|
+ topology instanceof NetworkTopologyWithNodeGroup);
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ private void verifyProperBlockPlacement(String file,
|
|
|
|
+ long length, int numOfReplicas) throws IOException {
|
|
|
|
+ BlockPlacementPolicy placementPolicy =
|
|
|
|
+ cluster.getNamesystem().getBlockManager().getBlockPlacementPolicy();
|
|
|
|
+ List<LocatedBlock> locatedBlocks = client.
|
|
|
|
+ getBlockLocations(file, 0, length).getLocatedBlocks();
|
|
|
|
+ assertFalse("No blocks found for file " + file, locatedBlocks.isEmpty());
|
|
|
|
+ for (LocatedBlock locatedBlock : locatedBlocks) {
|
|
|
|
+ BlockPlacementStatus status = placementPolicy.verifyBlockPlacement(
|
|
|
|
+ locatedBlock.getLocations(), numOfReplicas);
|
|
|
|
+ assertTrue("Block placement policy was not satisfied for block " +
|
|
|
|
+ locatedBlock.getBlock().getBlockId(),
|
|
|
|
+ status.isPlacementPolicySatisfied());
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+
|
|
/**
|
|
/**
|
|
* Create a cluster with even distribution, and a new empty node is added to
|
|
* Create a cluster with even distribution, and a new empty node is added to
|
|
* the cluster, then test rack locality for balancer policy.
|
|
* the cluster, then test rack locality for balancer policy.
|
|
@@ -232,6 +259,7 @@ public class TestBalancerWithNodeGroup {
|
|
cluster = new MiniDFSClusterWithNodeGroup(builder);
|
|
cluster = new MiniDFSClusterWithNodeGroup(builder);
|
|
try {
|
|
try {
|
|
cluster.waitActive();
|
|
cluster.waitActive();
|
|
|
|
+ verifyNetworkTopology();
|
|
client = NameNodeProxies.createProxy(conf,
|
|
client = NameNodeProxies.createProxy(conf,
|
|
cluster.getFileSystem(0).getUri(),
|
|
cluster.getFileSystem(0).getUri(),
|
|
ClientProtocol.class).getProxy();
|
|
ClientProtocol.class).getProxy();
|
|
@@ -258,12 +286,14 @@ public class TestBalancerWithNodeGroup {
|
|
totalCapacity += newCapacity;
|
|
totalCapacity += newCapacity;
|
|
|
|
|
|
// run balancer and validate results
|
|
// run balancer and validate results
|
|
- runBalancerCanFinish(conf, totalUsedSpace, totalCapacity);
|
|
|
|
|
|
+ runBalancer(conf, totalUsedSpace, totalCapacity);
|
|
|
|
|
|
lbs = client.getBlockLocations(filePath.toUri().getPath(), 0, length);
|
|
lbs = client.getBlockLocations(filePath.toUri().getPath(), 0, length);
|
|
Set<ExtendedBlock> after = getBlocksOnRack(lbs.getLocatedBlocks(), RACK0);
|
|
Set<ExtendedBlock> after = getBlocksOnRack(lbs.getLocatedBlocks(), RACK0);
|
|
assertEquals(before, after);
|
|
assertEquals(before, after);
|
|
-
|
|
|
|
|
|
+
|
|
|
|
+ verifyProperBlockPlacement(filePath.toUri().getPath(), length,
|
|
|
|
+ numOfDatanodes);
|
|
} finally {
|
|
} finally {
|
|
cluster.shutdown();
|
|
cluster.shutdown();
|
|
}
|
|
}
|
|
@@ -291,15 +321,18 @@ public class TestBalancerWithNodeGroup {
|
|
cluster = new MiniDFSClusterWithNodeGroup(builder);
|
|
cluster = new MiniDFSClusterWithNodeGroup(builder);
|
|
try {
|
|
try {
|
|
cluster.waitActive();
|
|
cluster.waitActive();
|
|
|
|
+ verifyNetworkTopology();
|
|
client = NameNodeProxies.createProxy(conf,
|
|
client = NameNodeProxies.createProxy(conf,
|
|
cluster.getFileSystem(0).getUri(),
|
|
cluster.getFileSystem(0).getUri(),
|
|
ClientProtocol.class).getProxy();
|
|
ClientProtocol.class).getProxy();
|
|
|
|
|
|
long totalCapacity = TestBalancer.sum(capacities);
|
|
long totalCapacity = TestBalancer.sum(capacities);
|
|
|
|
+ int numOfReplicas = numOfDatanodes / 2;
|
|
// fill up the cluster to be 20% full
|
|
// fill up the cluster to be 20% full
|
|
long totalUsedSpace = totalCapacity * 2 / 10;
|
|
long totalUsedSpace = totalCapacity * 2 / 10;
|
|
- TestBalancer.createFile(cluster, filePath, totalUsedSpace / (numOfDatanodes/2),
|
|
|
|
- (short) (numOfDatanodes/2), 0);
|
|
|
|
|
|
+ long length = totalUsedSpace / numOfReplicas;
|
|
|
|
+ TestBalancer.createFile(cluster, filePath, length,
|
|
|
|
+ (short) numOfReplicas, 0);
|
|
|
|
|
|
long newCapacity = CAPACITY;
|
|
long newCapacity = CAPACITY;
|
|
String newRack = RACK1;
|
|
String newRack = RACK1;
|
|
@@ -313,6 +346,9 @@ public class TestBalancerWithNodeGroup {
|
|
// run balancer and validate results
|
|
// run balancer and validate results
|
|
runBalancer(conf, totalUsedSpace, totalCapacity);
|
|
runBalancer(conf, totalUsedSpace, totalCapacity);
|
|
|
|
|
|
|
|
+ verifyProperBlockPlacement(filePath.toUri().getPath(), length,
|
|
|
|
+ numOfReplicas);
|
|
|
|
+
|
|
} finally {
|
|
} finally {
|
|
cluster.shutdown();
|
|
cluster.shutdown();
|
|
}
|
|
}
|
|
@@ -345,6 +381,7 @@ public class TestBalancerWithNodeGroup {
|
|
cluster = new MiniDFSClusterWithNodeGroup(builder);
|
|
cluster = new MiniDFSClusterWithNodeGroup(builder);
|
|
try {
|
|
try {
|
|
cluster.waitActive();
|
|
cluster.waitActive();
|
|
|
|
+ verifyNetworkTopology();
|
|
client = NameNodeProxies.createProxy(conf,
|
|
client = NameNodeProxies.createProxy(conf,
|
|
cluster.getFileSystem(0).getUri(),
|
|
cluster.getFileSystem(0).getUri(),
|
|
ClientProtocol.class).getProxy();
|
|
ClientProtocol.class).getProxy();
|
|
@@ -352,12 +389,17 @@ public class TestBalancerWithNodeGroup {
|
|
long totalCapacity = TestBalancer.sum(capacities);
|
|
long totalCapacity = TestBalancer.sum(capacities);
|
|
// fill up the cluster to be 60% full
|
|
// fill up the cluster to be 60% full
|
|
long totalUsedSpace = totalCapacity * 6 / 10;
|
|
long totalUsedSpace = totalCapacity * 6 / 10;
|
|
- TestBalancer.createFile(cluster, filePath, totalUsedSpace / 3,
|
|
|
|
- (short) (3), 0);
|
|
|
|
|
|
+ int numOfReplicas = 3;
|
|
|
|
+ long length = totalUsedSpace / 3;
|
|
|
|
+ TestBalancer.createFile(cluster, filePath, length,
|
|
|
|
+ (short) numOfReplicas, 0);
|
|
|
|
|
|
// run balancer which can finish in 5 iterations with no block movement.
|
|
// run balancer which can finish in 5 iterations with no block movement.
|
|
runBalancerCanFinish(conf, totalUsedSpace, totalCapacity);
|
|
runBalancerCanFinish(conf, totalUsedSpace, totalCapacity);
|
|
|
|
|
|
|
|
+ verifyProperBlockPlacement(filePath.toUri().getPath(), length,
|
|
|
|
+ numOfReplicas);
|
|
|
|
+
|
|
} finally {
|
|
} finally {
|
|
cluster.shutdown();
|
|
cluster.shutdown();
|
|
}
|
|
}
|