|
@@ -50,6 +50,9 @@ import org.junit.AfterClass;
|
|
|
import static org.junit.Assert.assertEquals;
|
|
|
import static org.junit.Assert.assertTrue;
|
|
|
import static org.junit.Assert.fail;
|
|
|
+import static org.mockito.Mockito.any;
|
|
|
+import static org.mockito.Mockito.anyLong;
|
|
|
+import static org.mockito.Mockito.doAnswer;
|
|
|
|
|
|
import java.io.File;
|
|
|
import java.io.IOException;
|
|
@@ -105,9 +108,14 @@ import org.apache.hadoop.hdfs.server.blockmanagement.BlockPlacementPolicy;
|
|
|
import org.apache.hadoop.hdfs.server.blockmanagement.BlockPlacementPolicyWithUpgradeDomain;
|
|
|
import org.apache.hadoop.hdfs.server.blockmanagement.BlockPlacementStatus;
|
|
|
import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeManager;
|
|
|
+import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.StartupOption;
|
|
|
import org.apache.hadoop.hdfs.server.datanode.DataNode;
|
|
|
import org.apache.hadoop.hdfs.server.datanode.SimulatedFSDataset;
|
|
|
import org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.LazyPersistTestCase;
|
|
|
+import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
|
|
|
+import org.apache.hadoop.hdfs.server.namenode.NameNode;
|
|
|
+import org.apache.hadoop.hdfs.server.namenode.NameNodeAdapter;
|
|
|
+import org.apache.hadoop.hdfs.server.protocol.BlocksWithLocations;
|
|
|
import org.apache.hadoop.http.HttpConfig;
|
|
|
import org.apache.hadoop.io.IOUtils;
|
|
|
import org.apache.hadoop.minikdc.MiniKdc;
|
|
@@ -121,6 +129,8 @@ import org.apache.hadoop.util.Tool;
|
|
|
import org.apache.log4j.Level;
|
|
|
import org.junit.After;
|
|
|
import org.junit.Test;
|
|
|
+import org.mockito.invocation.InvocationOnMock;
|
|
|
+import org.mockito.stubbing.Answer;
|
|
|
|
|
|
/**
|
|
|
* This class tests if a balancer schedules tasks correctly.
|
|
@@ -130,6 +140,7 @@ public class TestBalancer {
|
|
|
|
|
|
static {
|
|
|
GenericTestUtils.setLogLevel(Balancer.LOG, Level.ALL);
|
|
|
+ GenericTestUtils.setLogLevel(Dispatcher.LOG, Level.DEBUG);
|
|
|
}
|
|
|
|
|
|
final static long CAPACITY = 5000L;
|
|
@@ -776,6 +787,13 @@ public class TestBalancer {
|
|
|
doTest(conf, capacities, racks, newCapacity, newRack, null, useTool, false);
|
|
|
}
|
|
|
|
|
|
+ private void doTest(Configuration conf, long[] capacities, String[] racks,
|
|
|
+ long newCapacity, String newRack, NewNodeInfo nodes,
|
|
|
+ boolean useTool, boolean useFile) throws Exception {
|
|
|
+ doTest(conf, capacities, racks, newCapacity, newRack, nodes,
|
|
|
+ useTool, useFile, false);
|
|
|
+ }
|
|
|
+
|
|
|
/** This test start a cluster with specified number of nodes,
|
|
|
* and fills it to be 30% full (with a single file replicated identically
|
|
|
* to all datanodes);
|
|
@@ -791,11 +809,13 @@ public class TestBalancer {
|
|
|
* parsing, etc. Otherwise invoke balancer API directly.
|
|
|
* @param useFile - if true, the hosts to included or excluded will be stored in a
|
|
|
* file and then later read from the file.
|
|
|
+ * @param useNamesystemSpy - spy on FSNamesystem if true
|
|
|
* @throws Exception
|
|
|
*/
|
|
|
private void doTest(Configuration conf, long[] capacities,
|
|
|
String[] racks, long newCapacity, String newRack, NewNodeInfo nodes,
|
|
|
- boolean useTool, boolean useFile) throws Exception {
|
|
|
+ boolean useTool, boolean useFile,
|
|
|
+ boolean useNamesystemSpy) throws Exception {
|
|
|
LOG.info("capacities = " + long2String(capacities));
|
|
|
LOG.info("racks = " + Arrays.asList(racks));
|
|
|
LOG.info("newCapacity= " + newCapacity);
|
|
@@ -803,15 +823,25 @@ public class TestBalancer {
|
|
|
LOG.info("useTool = " + useTool);
|
|
|
assertEquals(capacities.length, racks.length);
|
|
|
int numOfDatanodes = capacities.length;
|
|
|
- cluster = new MiniDFSCluster.Builder(conf)
|
|
|
- .numDataNodes(capacities.length)
|
|
|
- .racks(racks)
|
|
|
- .simulatedCapacities(capacities)
|
|
|
- .build();
|
|
|
+
|
|
|
try {
|
|
|
+ cluster = new MiniDFSCluster.Builder(conf)
|
|
|
+ .numDataNodes(0)
|
|
|
+ .build();
|
|
|
+ cluster.getConfiguration(0).setInt(DFSConfigKeys.DFS_REPLICATION_KEY,
|
|
|
+ DFSConfigKeys.DFS_REPLICATION_DEFAULT);
|
|
|
+ conf.setInt(DFSConfigKeys.DFS_REPLICATION_KEY,
|
|
|
+ DFSConfigKeys.DFS_REPLICATION_DEFAULT);
|
|
|
+ if(useNamesystemSpy) {
|
|
|
+ LOG.info("Using Spy Namesystem");
|
|
|
+ spyFSNamesystem(cluster.getNameNode());
|
|
|
+ }
|
|
|
+ cluster.startDataNodes(conf, numOfDatanodes, true,
|
|
|
+ StartupOption.REGULAR, racks, null, capacities, false);
|
|
|
+ cluster.waitClusterUp();
|
|
|
cluster.waitActive();
|
|
|
- client = NameNodeProxies.createProxy(conf, cluster.getFileSystem(0).getUri(),
|
|
|
- ClientProtocol.class).getProxy();
|
|
|
+ client = NameNodeProxies.createProxy(conf,
|
|
|
+ cluster.getFileSystem(0).getUri(), ClientProtocol.class).getProxy();
|
|
|
|
|
|
long totalCapacity = sum(capacities);
|
|
|
|
|
@@ -891,7 +921,9 @@ public class TestBalancer {
|
|
|
runBalancer(conf, totalUsedSpace, totalCapacity, p, expectedExcludedNodes);
|
|
|
}
|
|
|
} finally {
|
|
|
- cluster.shutdown();
|
|
|
+ if(cluster != null) {
|
|
|
+ cluster.shutdown();
|
|
|
+ }
|
|
|
}
|
|
|
}
|
|
|
|
|
@@ -2004,6 +2036,54 @@ public class TestBalancer {
|
|
|
}
|
|
|
}
|
|
|
|
|
|
+ private static int numGetBlocksCalls;
|
|
|
+ private static long startGetBlocksTime, endGetBlocksTime;
|
|
|
+
|
|
|
+ private void spyFSNamesystem(NameNode nn) throws IOException {
|
|
|
+ FSNamesystem fsnSpy = NameNodeAdapter.spyOnNamesystem(nn);
|
|
|
+ numGetBlocksCalls = 0;
|
|
|
+ endGetBlocksTime = startGetBlocksTime = Time.monotonicNow();
|
|
|
+ doAnswer(new Answer<BlocksWithLocations>() {
|
|
|
+ @Override
|
|
|
+ public BlocksWithLocations answer(InvocationOnMock invocation)
|
|
|
+ throws Throwable {
|
|
|
+ BlocksWithLocations blk =
|
|
|
+ (BlocksWithLocations)invocation.callRealMethod();
|
|
|
+ endGetBlocksTime = Time.monotonicNow();
|
|
|
+ numGetBlocksCalls++;
|
|
|
+ return blk;
|
|
|
+ }}).when(fsnSpy).getBlocks(any(DatanodeID.class), anyLong());
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * Test that makes the Balancer to disperse RPCs to the NameNode
|
|
|
+ * in order to avoid NN's RPC queue saturation.
|
|
|
+ */
|
|
|
+ void testBalancerRPCDelay() throws Exception {
|
|
|
+ final Configuration conf = new HdfsConfiguration();
|
|
|
+ initConf(conf);
|
|
|
+ conf.setInt(DFSConfigKeys.DFS_BALANCER_DISPATCHERTHREADS_KEY, 30);
|
|
|
+
|
|
|
+ int numDNs = 40;
|
|
|
+ long[] capacities = new long[numDNs];
|
|
|
+ String[] racks = new String[numDNs];
|
|
|
+ for(int i = 0; i < numDNs; i++) {
|
|
|
+ capacities[i] = CAPACITY;
|
|
|
+ racks[i] = (i < numDNs/2 ? RACK0 : RACK1);
|
|
|
+ }
|
|
|
+ doTest(conf, capacities, racks, CAPACITY, RACK2,
|
|
|
+ new PortNumberBasedNodes(3, 0, 0), false, false, true);
|
|
|
+ assertTrue("Number of getBlocks should be not less than " +
|
|
|
+ Dispatcher.BALANCER_NUM_RPC_PER_SEC,
|
|
|
+ numGetBlocksCalls > Dispatcher.BALANCER_NUM_RPC_PER_SEC);
|
|
|
+ long d = 1 + endGetBlocksTime - startGetBlocksTime;
|
|
|
+ LOG.info("Balancer executed " + numGetBlocksCalls
|
|
|
+ + " getBlocks in " + d + " msec.");
|
|
|
+ assertTrue("Expected BALANCER_NUM_RPC_PER_SEC = " +
|
|
|
+ Dispatcher.BALANCER_NUM_RPC_PER_SEC,
|
|
|
+ (numGetBlocksCalls * 1000 / d) < Dispatcher.BALANCER_NUM_RPC_PER_SEC);
|
|
|
+ }
|
|
|
+
|
|
|
/**
|
|
|
* @param args
|
|
|
*/
|