|
@@ -72,8 +72,11 @@ import java.util.Properties;
|
|
|
import java.util.Random;
|
|
|
import java.util.Set;
|
|
|
import java.util.concurrent.TimeoutException;
|
|
|
+import java.util.concurrent.atomic.AtomicInteger;
|
|
|
+import java.util.concurrent.atomic.AtomicLong;
|
|
|
|
|
|
import org.apache.commons.lang3.StringUtils;
|
|
|
+import org.junit.Before;
|
|
|
import org.slf4j.Logger;
|
|
|
import org.slf4j.LoggerFactory;
|
|
|
import org.apache.hadoop.HadoopIllegalArgumentException;
|
|
@@ -157,6 +160,16 @@ public class TestBalancer {
|
|
|
private static MiniKdc kdc;
|
|
|
private static File keytabFile;
|
|
|
private MiniDFSCluster cluster;
|
|
|
+ private AtomicInteger numGetBlocksCalls;
|
|
|
+ private AtomicLong startGetBlocksTime;
|
|
|
+ private AtomicLong endGetBlocksTime;
|
|
|
+
|
|
|
+ @Before
|
|
|
+ public void setup() {
|
|
|
+ numGetBlocksCalls = new AtomicInteger(0);
|
|
|
+ startGetBlocksTime = new AtomicLong(Long.MAX_VALUE);
|
|
|
+ endGetBlocksTime = new AtomicLong(Long.MIN_VALUE);
|
|
|
+ }
|
|
|
|
|
|
@After
|
|
|
public void shutdown() throws Exception {
|
|
@@ -791,7 +804,7 @@ public class TestBalancer {
|
|
|
long newCapacity, String newRack, NewNodeInfo nodes,
|
|
|
boolean useTool, boolean useFile) throws Exception {
|
|
|
doTest(conf, capacities, racks, newCapacity, newRack, nodes,
|
|
|
- useTool, useFile, false);
|
|
|
+ useTool, useFile, false, 0.3);
|
|
|
}
|
|
|
|
|
|
/** This test start a cluster with specified number of nodes,
|
|
@@ -810,12 +823,14 @@ public class TestBalancer {
|
|
|
* @param useFile - if true, the hosts to included or excluded will be stored in a
|
|
|
* file and then later read from the file.
|
|
|
* @param useNamesystemSpy - spy on FSNamesystem if true
|
|
|
+ * @param clusterUtilization - The utilization of the cluster to start, from
|
|
|
+ * 0.0 to 1.0
|
|
|
* @throws Exception
|
|
|
*/
|
|
|
private void doTest(Configuration conf, long[] capacities,
|
|
|
String[] racks, long newCapacity, String newRack, NewNodeInfo nodes,
|
|
|
boolean useTool, boolean useFile,
|
|
|
- boolean useNamesystemSpy) throws Exception {
|
|
|
+ boolean useNamesystemSpy, double clusterUtilization) throws Exception {
|
|
|
LOG.info("capacities = " + long2String(capacities));
|
|
|
LOG.info("racks = " + Arrays.asList(racks));
|
|
|
LOG.info("newCapacity= " + newCapacity);
|
|
@@ -845,8 +860,8 @@ public class TestBalancer {
|
|
|
|
|
|
long totalCapacity = sum(capacities);
|
|
|
|
|
|
- // fill up the cluster to be 30% full
|
|
|
- long totalUsedSpace = totalCapacity*3/10;
|
|
|
+ // fill up the cluster to be `clusterUtilization` full
|
|
|
+ long totalUsedSpace = (long) (totalCapacity * clusterUtilization);
|
|
|
createFile(cluster, filePath, totalUsedSpace / numOfDatanodes,
|
|
|
(short) numOfDatanodes, 0);
|
|
|
|
|
@@ -2135,33 +2150,34 @@ public class TestBalancer {
|
|
|
}
|
|
|
}
|
|
|
|
|
|
- private static int numGetBlocksCalls;
|
|
|
- private static long startGetBlocksTime, endGetBlocksTime;
|
|
|
-
|
|
|
private void spyFSNamesystem(NameNode nn) throws IOException {
|
|
|
FSNamesystem fsnSpy = NameNodeAdapter.spyOnNamesystem(nn);
|
|
|
- numGetBlocksCalls = 0;
|
|
|
- endGetBlocksTime = startGetBlocksTime = Time.monotonicNow();
|
|
|
doAnswer(new Answer<BlocksWithLocations>() {
|
|
|
@Override
|
|
|
public BlocksWithLocations answer(InvocationOnMock invocation)
|
|
|
throws Throwable {
|
|
|
+ long startTime = Time.monotonicNow();
|
|
|
+ startGetBlocksTime.getAndUpdate((curr) -> Math.min(curr, startTime));
|
|
|
BlocksWithLocations blk =
|
|
|
(BlocksWithLocations)invocation.callRealMethod();
|
|
|
- endGetBlocksTime = Time.monotonicNow();
|
|
|
- numGetBlocksCalls++;
|
|
|
+ long endTime = Time.monotonicNow();
|
|
|
+ endGetBlocksTime.getAndUpdate((curr) -> Math.max(curr, endTime));
|
|
|
+ numGetBlocksCalls.incrementAndGet();
|
|
|
return blk;
|
|
|
}}).when(fsnSpy).getBlocks(any(DatanodeID.class), anyLong(), anyLong());
|
|
|
}
|
|
|
|
|
|
/**
|
|
|
* Test that makes the Balancer to disperse RPCs to the NameNode
|
|
|
- * in order to avoid NN's RPC queue saturation.
|
|
|
+ * in order to avoid NN's RPC queue saturation. This not marked as @Test
|
|
|
+ * because it is run from {@link TestBalancerRPCDelay}.
|
|
|
*/
|
|
|
- void testBalancerRPCDelay() throws Exception {
|
|
|
+ void testBalancerRPCDelay(int getBlocksMaxQps) throws Exception {
|
|
|
final Configuration conf = new HdfsConfiguration();
|
|
|
initConf(conf);
|
|
|
conf.setInt(DFSConfigKeys.DFS_BALANCER_DISPATCHERTHREADS_KEY, 30);
|
|
|
+ conf.setInt(DFSConfigKeys.DFS_NAMENODE_GETBLOCKS_MAX_QPS_KEY,
|
|
|
+ getBlocksMaxQps);
|
|
|
|
|
|
int numDNs = 20;
|
|
|
long[] capacities = new long[numDNs];
|
|
@@ -2171,16 +2187,22 @@ public class TestBalancer {
|
|
|
racks[i] = (i < numDNs/2 ? RACK0 : RACK1);
|
|
|
}
|
|
|
doTest(conf, capacities, racks, CAPACITY, RACK2,
|
|
|
- new PortNumberBasedNodes(3, 0, 0), false, false, true);
|
|
|
+ // Use only 1 node and set the starting capacity to 50% to allow the
|
|
|
+ // balancing to complete in only one iteration. This is necessary
|
|
|
+ // because the startGetBlocksTime and endGetBlocksTime measures across
|
|
|
+ // all get block calls, so if two iterations are performed, the duration
|
|
|
+ // also includes the time it took to perform the block move ops in the
|
|
|
+ // first iteration
|
|
|
+ new PortNumberBasedNodes(1, 0, 0), false, false, true, 0.5);
|
|
|
assertTrue("Number of getBlocks should be not less than " +
|
|
|
- Dispatcher.BALANCER_NUM_RPC_PER_SEC,
|
|
|
- numGetBlocksCalls > Dispatcher.BALANCER_NUM_RPC_PER_SEC);
|
|
|
- long d = 1 + endGetBlocksTime - startGetBlocksTime;
|
|
|
- LOG.info("Balancer executed " + numGetBlocksCalls
|
|
|
- + " getBlocks in " + d + " msec.");
|
|
|
- assertTrue("Expected BALANCER_NUM_RPC_PER_SEC = " +
|
|
|
- Dispatcher.BALANCER_NUM_RPC_PER_SEC,
|
|
|
- (numGetBlocksCalls * 1000 / d) < Dispatcher.BALANCER_NUM_RPC_PER_SEC);
|
|
|
+ getBlocksMaxQps, numGetBlocksCalls.get() >= getBlocksMaxQps);
|
|
|
+ long durationMs = 1 + endGetBlocksTime.get() - startGetBlocksTime.get();
|
|
|
+ int durationSec = (int) Math.ceil(durationMs / 1000.0);
|
|
|
+ LOG.info("Balancer executed {} getBlocks in {} msec (round up to {} sec)",
|
|
|
+ numGetBlocksCalls.get(), durationMs, durationSec);
|
|
|
+ long getBlockCallsPerSecond = numGetBlocksCalls.get() / durationSec;
|
|
|
+ assertTrue("Expected balancer getBlocks calls per second <= " +
|
|
|
+ getBlocksMaxQps, getBlockCallsPerSecond <= getBlocksMaxQps);
|
|
|
}
|
|
|
|
|
|
/**
|