|
@@ -24,13 +24,15 @@ import java.io.ByteArrayOutputStream;
|
|
|
import java.io.PrintStream;
|
|
|
import java.nio.charset.Charset;
|
|
|
import java.util.ArrayList;
|
|
|
+import java.util.concurrent.TimeoutException;
|
|
|
|
|
|
+import com.google.common.base.Supplier;
|
|
|
import org.apache.commons.logging.Log;
|
|
|
import org.apache.commons.logging.LogFactory;
|
|
|
import org.apache.hadoop.conf.Configuration;
|
|
|
-import org.apache.hadoop.hdfs.protocol.ClientDatanodeProtocol;
|
|
|
import org.apache.hadoop.hdfs.server.datanode.DataNode;
|
|
|
import org.apache.hadoop.hdfs.tools.DFSAdmin;
|
|
|
+import org.apache.hadoop.test.GenericTestUtils;
|
|
|
import org.junit.Test;
|
|
|
|
|
|
/**
|
|
@@ -54,9 +56,8 @@ public class TestBalancerBandwidth {
|
|
|
DEFAULT_BANDWIDTH);
|
|
|
|
|
|
/* Create and start cluster */
|
|
|
- MiniDFSCluster cluster =
|
|
|
- new MiniDFSCluster.Builder(conf).numDataNodes(NUM_OF_DATANODES).build();
|
|
|
- try {
|
|
|
+ try (MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf)
|
|
|
+ .numDataNodes(NUM_OF_DATANODES).build()) {
|
|
|
cluster.waitActive();
|
|
|
|
|
|
DistributedFileSystem fs = cluster.getFileSystem();
|
|
@@ -65,12 +66,6 @@ public class TestBalancerBandwidth {
|
|
|
// Ensure value from the configuration is reflected in the datanodes.
|
|
|
assertEquals(DEFAULT_BANDWIDTH, (long) datanodes.get(0).getBalancerBandwidth());
|
|
|
assertEquals(DEFAULT_BANDWIDTH, (long) datanodes.get(1).getBalancerBandwidth());
|
|
|
- ClientDatanodeProtocol dn1Proxy = DFSUtilClient
|
|
|
- .createClientDatanodeProtocolProxy(datanodes.get(0).getDatanodeId(),
|
|
|
- conf, 60000, false);
|
|
|
- ClientDatanodeProtocol dn2Proxy = DFSUtilClient
|
|
|
- .createClientDatanodeProtocolProxy(datanodes.get(1).getDatanodeId(),
|
|
|
- conf, 60000, false);
|
|
|
DFSAdmin admin = new DFSAdmin(conf);
|
|
|
String dn1Address = datanodes.get(0).ipcServer.getListenerAddress()
|
|
|
.getHostName() + ":" + datanodes.get(0).getIpcPort();
|
|
@@ -79,51 +74,49 @@ public class TestBalancerBandwidth {
|
|
|
|
|
|
// verifies the dfsadmin command execution
|
|
|
String[] args = new String[] { "-getBalancerBandwidth", dn1Address };
|
|
|
- runGetBalancerBandwidthCmd(admin, args, dn1Proxy, DEFAULT_BANDWIDTH);
|
|
|
+ runGetBalancerBandwidthCmd(admin, args, DEFAULT_BANDWIDTH);
|
|
|
args = new String[] { "-getBalancerBandwidth", dn2Address };
|
|
|
- runGetBalancerBandwidthCmd(admin, args, dn2Proxy, DEFAULT_BANDWIDTH);
|
|
|
+ runGetBalancerBandwidthCmd(admin, args, DEFAULT_BANDWIDTH);
|
|
|
|
|
|
// Dynamically change balancer bandwidth and ensure the updated value
|
|
|
// is reflected on the datanodes.
|
|
|
long newBandwidth = 12 * DEFAULT_BANDWIDTH; // 12M bps
|
|
|
fs.setBalancerBandwidth(newBandwidth);
|
|
|
+ verifyBalancerBandwidth(datanodes, newBandwidth);
|
|
|
|
|
|
- // Give it a few seconds to propogate new the value to the datanodes.
|
|
|
- try {
|
|
|
- Thread.sleep(5000);
|
|
|
- } catch (Exception e) {}
|
|
|
-
|
|
|
- assertEquals(newBandwidth, (long) datanodes.get(0).getBalancerBandwidth());
|
|
|
- assertEquals(newBandwidth, (long) datanodes.get(1).getBalancerBandwidth());
|
|
|
// verifies the dfsadmin command execution
|
|
|
args = new String[] { "-getBalancerBandwidth", dn1Address };
|
|
|
- runGetBalancerBandwidthCmd(admin, args, dn1Proxy, newBandwidth);
|
|
|
+ runGetBalancerBandwidthCmd(admin, args, newBandwidth);
|
|
|
args = new String[] { "-getBalancerBandwidth", dn2Address };
|
|
|
- runGetBalancerBandwidthCmd(admin, args, dn2Proxy, newBandwidth);
|
|
|
+ runGetBalancerBandwidthCmd(admin, args, newBandwidth);
|
|
|
|
|
|
// Dynamically change balancer bandwidth to 0. Balancer bandwidth on the
|
|
|
// datanodes should remain as it was.
|
|
|
fs.setBalancerBandwidth(0);
|
|
|
|
|
|
- // Give it a few seconds to propogate new the value to the datanodes.
|
|
|
- try {
|
|
|
- Thread.sleep(5000);
|
|
|
- } catch (Exception e) {}
|
|
|
+ verifyBalancerBandwidth(datanodes, newBandwidth);
|
|
|
|
|
|
- assertEquals(newBandwidth, (long) datanodes.get(0).getBalancerBandwidth());
|
|
|
- assertEquals(newBandwidth, (long) datanodes.get(1).getBalancerBandwidth());
|
|
|
// verifies the dfsadmin command execution
|
|
|
args = new String[] { "-getBalancerBandwidth", dn1Address };
|
|
|
- runGetBalancerBandwidthCmd(admin, args, dn1Proxy, newBandwidth);
|
|
|
+ runGetBalancerBandwidthCmd(admin, args, newBandwidth);
|
|
|
args = new String[] { "-getBalancerBandwidth", dn2Address };
|
|
|
- runGetBalancerBandwidthCmd(admin, args, dn2Proxy, newBandwidth);
|
|
|
- } finally {
|
|
|
- cluster.shutdown();
|
|
|
+ runGetBalancerBandwidthCmd(admin, args, newBandwidth);
|
|
|
}
|
|
|
}
|
|
|
|
|
|
+ private void verifyBalancerBandwidth(final ArrayList<DataNode> datanodes,
|
|
|
+ final long newBandwidth) throws TimeoutException, InterruptedException {
|
|
|
+ GenericTestUtils.waitFor(new Supplier<Boolean>() {
|
|
|
+ @Override
|
|
|
+ public Boolean get() {
|
|
|
+ return (long) datanodes.get(0).getBalancerBandwidth() == newBandwidth
|
|
|
+ && (long) datanodes.get(1).getBalancerBandwidth() == newBandwidth;
|
|
|
+ }
|
|
|
+ }, 100, 60 * 1000);
|
|
|
+ }
|
|
|
+
|
|
|
private void runGetBalancerBandwidthCmd(DFSAdmin admin, String[] args,
|
|
|
- ClientDatanodeProtocol proxy, long expectedBandwidth) throws Exception {
|
|
|
+ long expectedBandwidth) throws Exception {
|
|
|
PrintStream initialStdOut = System.out;
|
|
|
outContent.reset();
|
|
|
try {
|