|
@@ -48,7 +48,6 @@ import static org.junit.Assert.assertEquals;
|
|
|
import static org.junit.Assert.assertTrue;
|
|
|
import static org.junit.Assert.fail;
|
|
|
import static org.junit.Assume.assumeTrue;
|
|
|
-
|
|
|
import java.io.File;
|
|
|
import java.io.IOException;
|
|
|
import java.io.OutputStream;
|
|
@@ -101,6 +100,7 @@ import org.apache.hadoop.hdfs.server.blockmanagement.BlockPlacementPolicy;
|
|
|
import org.apache.hadoop.hdfs.server.blockmanagement.BlockPlacementPolicyWithUpgradeDomain;
|
|
|
import org.apache.hadoop.hdfs.server.blockmanagement.BlockPlacementStatus;
|
|
|
import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeManager;
|
|
|
+import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.StartupOption;
|
|
|
import org.apache.hadoop.hdfs.server.datanode.DataNode;
|
|
|
import org.apache.hadoop.hdfs.server.datanode.SimulatedFSDataset;
|
|
|
import org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.LazyPersistTestCase;
|
|
@@ -126,6 +126,7 @@ public class TestBalancer {
|
|
|
|
|
|
static {
|
|
|
GenericTestUtils.setLogLevel(Balancer.LOG, Level.ALL);
|
|
|
+ GenericTestUtils.setLogLevel(Dispatcher.LOG, Level.DEBUG);
|
|
|
}
|
|
|
|
|
|
final static long CAPACITY = 5000L;
|
|
@@ -765,6 +766,7 @@ public class TestBalancer {
|
|
|
* parsing, etc. Otherwise invoke balancer API directly.
|
|
|
* @param useFile - if true, the hosts to included or excluded will be stored in a
|
|
|
* file and then later read from the file.
|
|
|
+ * @param useNamesystemSpy - spy on FSNamesystem if true
|
|
|
* @throws Exception
|
|
|
*/
|
|
|
private void doTest(Configuration conf, long[] capacities,
|
|
@@ -777,15 +779,21 @@ public class TestBalancer {
|
|
|
LOG.info("useTool = " + useTool);
|
|
|
assertEquals(capacities.length, racks.length);
|
|
|
int numOfDatanodes = capacities.length;
|
|
|
- cluster = new MiniDFSCluster.Builder(conf)
|
|
|
- .numDataNodes(capacities.length)
|
|
|
- .racks(racks)
|
|
|
- .simulatedCapacities(capacities)
|
|
|
- .build();
|
|
|
+
|
|
|
try {
|
|
|
+ cluster = new MiniDFSCluster.Builder(conf)
|
|
|
+ .numDataNodes(0)
|
|
|
+ .build();
|
|
|
+ cluster.getConfiguration(0).setInt(DFSConfigKeys.DFS_REPLICATION_KEY,
|
|
|
+ DFSConfigKeys.DFS_REPLICATION_DEFAULT);
|
|
|
+ conf.setInt(DFSConfigKeys.DFS_REPLICATION_KEY,
|
|
|
+ DFSConfigKeys.DFS_REPLICATION_DEFAULT);
|
|
|
+ cluster.startDataNodes(conf, numOfDatanodes, true,
|
|
|
+ StartupOption.REGULAR, racks, null, capacities, false);
|
|
|
+ cluster.waitClusterUp();
|
|
|
cluster.waitActive();
|
|
|
- client = NameNodeProxies.createProxy(conf, cluster.getFileSystem(0).getUri(),
|
|
|
- ClientProtocol.class).getProxy();
|
|
|
+ client = NameNodeProxies.createProxy(conf,
|
|
|
+ cluster.getFileSystem(0).getUri(), ClientProtocol.class).getProxy();
|
|
|
|
|
|
long totalCapacity = sum(capacities);
|
|
|
|
|
@@ -865,7 +873,9 @@ public class TestBalancer {
|
|
|
runBalancer(conf, totalUsedSpace, totalCapacity, p, expectedExcludedNodes);
|
|
|
}
|
|
|
} finally {
|
|
|
- cluster.shutdown();
|
|
|
+ if(cluster != null) {
|
|
|
+ cluster.shutdown();
|
|
|
+ }
|
|
|
}
|
|
|
}
|
|
|
|
|
@@ -1906,6 +1916,26 @@ public class TestBalancer {
|
|
|
}
|
|
|
}
|
|
|
|
|
|
+ /**
|
|
|
+ * Test that makes the Balancer to disperse RPCs to the NameNode
|
|
|
+ * in order to avoid NN's RPC queue saturation.
|
|
|
+ */
|
|
|
+ void testBalancerRPCDelay() throws Exception {
|
|
|
+ final Configuration conf = new HdfsConfiguration();
|
|
|
+ initConf(conf);
|
|
|
+ conf.setInt(DFSConfigKeys.DFS_BALANCER_DISPATCHERTHREADS_KEY, 30);
|
|
|
+
|
|
|
+ int numDNs = 40;
|
|
|
+ long[] capacities = new long[numDNs];
|
|
|
+ String[] racks = new String[numDNs];
|
|
|
+ for(int i = 0; i < numDNs; i++) {
|
|
|
+ capacities[i] = CAPACITY;
|
|
|
+ racks[i] = (i < numDNs/2 ? RACK0 : RACK1);
|
|
|
+ }
|
|
|
+ doTest(conf, capacities, racks, CAPACITY, RACK2,
|
|
|
+ new PortNumberBasedNodes(3, 0, 0), false, false);
|
|
|
+ }
|
|
|
+
|
|
|
/**
|
|
|
* @param args
|
|
|
*/
|