|
@@ -222,6 +222,69 @@ public class TestDiskBalancer {
|
|
|
|
|
|
}
|
|
|
|
|
|
+ @Test
|
|
|
+ public void testDiskBalancerComputeDelay() throws Exception {
|
|
|
+
|
|
|
+ Configuration conf = new HdfsConfiguration();
|
|
|
+ conf.setBoolean(DFSConfigKeys.DFS_DISK_BALANCER_ENABLED, true);
|
|
|
+
|
|
|
+ final int blockCount = 100;
|
|
|
+ final int blockSize = 11 * 1024 * 1024;
|
|
|
+ final int diskCount = 2;
|
|
|
+ final int dataNodeCount = 1;
|
|
|
+ final int dataNodeIndex = 0;
|
|
|
+ final long cap = blockSize * 2L * blockCount;
|
|
|
+
|
|
|
+ conf.setInt(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, blockSize);
|
|
|
+ conf.setInt(DFSConfigKeys.DFS_BYTES_PER_CHECKSUM_KEY, blockSize);
|
|
|
+
|
|
|
+ final MiniDFSCluster cluster = new ClusterBuilder()
|
|
|
+ .setBlockCount(blockCount).setBlockSize(blockSize)
|
|
|
+ .setDiskCount(diskCount).setNumDatanodes(dataNodeCount).setConf(conf)
|
|
|
+ .setCapacities(new long[] {cap, cap }).build();
|
|
|
+
|
|
|
+ try {
|
|
|
+ DataNode node = cluster.getDataNodes().get(dataNodeIndex);
|
|
|
+
|
|
|
+ final FsDatasetSpi<?> fsDatasetSpy = Mockito.spy(node.getFSDataset());
|
|
|
+ DiskBalancerWorkItem item = Mockito.spy(new DiskBalancerWorkItem());
|
|
|
+ // Mocking bandwidth as 10mb/sec.
|
|
|
+ Mockito.doReturn((long) 10).when(item).getBandwidth();
|
|
|
+
|
|
|
+ doAnswer(new Answer<Object>() {
|
|
|
+ public Object answer(InvocationOnMock invocation) {
|
|
|
+ try {
|
|
|
+ node.getFSDataset().moveBlockAcrossVolumes(
|
|
|
+ (ExtendedBlock) invocation.getArguments()[0],
|
|
|
+ (FsVolumeSpi) invocation.getArguments()[1]);
|
|
|
+ } catch (Exception e) {
|
|
|
+ LOG.error(e.getMessage());
|
|
|
+ }
|
|
|
+ return null;
|
|
|
+ }
|
|
|
+ }).when(fsDatasetSpy).moveBlockAcrossVolumes(any(ExtendedBlock.class),
|
|
|
+ any(FsVolumeSpi.class));
|
|
|
+
|
|
|
+ DiskBalancerMover diskBalancerMover = new DiskBalancerMover(fsDatasetSpy,
|
|
|
+ conf);
|
|
|
+
|
|
|
+ diskBalancerMover.setRunnable();
|
|
|
+
|
|
|
+ // bytesCopied - 20 * 1024 *1024 byteCopied.
|
|
|
+ // timeUsed - 1200 in milliseconds
|
|
|
+ // item - set DiskBalancerWorkItem bandwidth as 10
|
|
|
+ // Expect return sleep delay in Milliseconds. sleep value = bytesCopied /
|
|
|
+ // (1024*1024*bandwidth in MB/milli) - timeUsed;
|
|
|
+ long val = diskBalancerMover.computeDelay(20 * 1024 * 1024, 1200, item);
|
|
|
+ Assert.assertEquals(val, (long) 800);
|
|
|
+ } catch (Exception e) {
|
|
|
+ Assert.fail("Unexpected exception: " + e);
|
|
|
+ } finally {
|
|
|
+ if (cluster != null) {
|
|
|
+ cluster.shutdown();
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
|
|
|
@Test
|
|
|
public void testDiskBalancerWithFedClusterWithOneNameServiceEmpty() throws
|