|
@@ -28,16 +28,15 @@ import java.io.OutputStream;
|
|
|
import java.io.PrintStream;
|
|
|
import java.net.URI;
|
|
|
import java.security.SecureRandom;
|
|
|
-import java.util.HashMap;
|
|
|
+import java.util.Arrays;
|
|
|
import java.util.Iterator;
|
|
|
import java.util.LinkedList;
|
|
|
import java.util.List;
|
|
|
import java.util.Map;
|
|
|
import javax.management.InstanceNotFoundException;
|
|
|
-import javax.management.MBeanException;
|
|
|
import javax.management.MBeanServerConnection;
|
|
|
+import javax.management.MalformedObjectNameException;
|
|
|
import javax.management.ObjectName;
|
|
|
-import javax.management.ReflectionException;
|
|
|
import javax.management.remote.JMXConnector;
|
|
|
import javax.management.remote.JMXConnectorFactory;
|
|
|
import javax.management.remote.JMXServiceURL;
|
|
@@ -56,18 +55,12 @@ import org.apache.hadoop.hdfs.test.system.DNClient;
|
|
|
|
|
|
import org.apache.hadoop.mapreduce.test.system.MRCluster;
|
|
|
|
|
|
-import org.apache.hadoop.examples.RandomWriter;
|
|
|
import org.apache.hadoop.io.IOUtils;
|
|
|
|
|
|
-import org.apache.hadoop.mapred.JobClient;
|
|
|
-import org.apache.hadoop.mapred.JobID;
|
|
|
-import org.apache.hadoop.mapred.JobStatus;
|
|
|
-import org.apache.hadoop.mapred.RunningJob;
|
|
|
-import org.apache.hadoop.mapreduce.test.system.JTProtocol;
|
|
|
-import org.apache.hadoop.mapreduce.test.system.JobInfo;
|
|
|
+import org.apache.hadoop.mapreduce.test.system.JTClient;
|
|
|
+import org.apache.hadoop.mapreduce.test.system.TTClient;
|
|
|
+import org.apache.hadoop.test.system.AbstractDaemonClient;
|
|
|
import org.apache.hadoop.util.Progressable;
|
|
|
-import org.apache.hadoop.util.Tool;
|
|
|
-import org.apache.hadoop.util.ToolRunner;
|
|
|
|
|
|
import org.junit.After;
|
|
|
import org.junit.Assert;
|
|
@@ -82,20 +75,6 @@ public class TestBalancer {
|
|
|
private Configuration hadoopConf;
|
|
|
private HDFSCluster dfsCluster;
|
|
|
private MRCluster mrCluster;
|
|
|
- // TODO don't hardwire these, introspect the cluster
|
|
|
- private static final String NAMENODE = "gsbl90277.blue.ygrid.yahoo.com";
|
|
|
- private static final String[] ENDPOINT_JMX = {
|
|
|
- "gsbl90277.blue.ygrid.yahoo.com-8008",
|
|
|
- "gsbl90276.blue.ygrid.yahoo.com-24812",
|
|
|
- "gsbl90275.blue.ygrid.yahoo.com-24810",
|
|
|
- "gsbl90274.blue.ygrid.yahoo.com-24808",
|
|
|
- "gsbl90273.blue.ygrid.yahoo.com-24806",
|
|
|
- "gsbl90272.blue.ygrid.yahoo.com-24804",
|
|
|
- "gsbl90271.blue.ygrid.yahoo.com-24802",
|
|
|
- "gsbl90270.blue.ygrid.yahoo.com-24800"
|
|
|
- };
|
|
|
- private Map<String, MBeanServerConnection> endpointMap =
|
|
|
- new HashMap<String, MBeanServerConnection>();
|
|
|
|
|
|
public TestBalancer() throws Exception {
|
|
|
}
|
|
@@ -108,7 +87,7 @@ public class TestBalancer {
|
|
|
//TODO no need for mr cluster anymore
|
|
|
mrCluster = MRCluster.createCluster(hadoopConf);
|
|
|
mrCluster.setUp();
|
|
|
- connectJMX();
|
|
|
+ //connectJMX();
|
|
|
}
|
|
|
|
|
|
@After
|
|
@@ -117,148 +96,7 @@ public class TestBalancer {
|
|
|
mrCluster.tearDown();
|
|
|
}
|
|
|
|
|
|
- /** Connect to JMX agents on HDFS cluster nodes */
|
|
|
- private void connectJMX() {
|
|
|
- final int HOST = 0;
|
|
|
- final int PORT = 1;
|
|
|
- for (String endpoint : ENDPOINT_JMX) {
|
|
|
- String[] toks = endpoint.split("-");
|
|
|
- String host = toks[HOST];
|
|
|
- String port = toks[PORT];
|
|
|
- LOG.info("HOST=" + host + ", PORT=" + port);
|
|
|
- MBeanServerConnection jmxEndpoint = getJMXEndpoint(host, port);
|
|
|
- endpointMap.put(host, jmxEndpoint);
|
|
|
- }
|
|
|
- }
|
|
|
-
|
|
|
- private long getDataNodeFreeSpace(DNClient datanode) {
|
|
|
- String dnHost = datanode.getHostName();
|
|
|
- Object volObj = getDNAttribute(dnHost, "VolumeInfo");
|
|
|
- Map volInfoMap = (Map) JSON.parse(volObj.toString());
|
|
|
- long totalFreeSpace = 0L;
|
|
|
- for (Object key : volInfoMap.keySet()) {
|
|
|
- Map attrMap = (Map) volInfoMap.get(key);
|
|
|
- long freeSpace = (Long) attrMap.get("freeSpace");
|
|
|
- //LOG.info( String.format("volume %s has %d bytes free space left", key, freeSpace) );
|
|
|
- totalFreeSpace += freeSpace;
|
|
|
- }
|
|
|
- //LOG.info(String.format("got from host %s volinfo:\n%s", dnHost, volObj));
|
|
|
- return totalFreeSpace;
|
|
|
- }
|
|
|
-
|
|
|
- private long getDataNodeUsedSpace(DNClient datanode) {
|
|
|
- String dnHost = datanode.getHostName();
|
|
|
- LOG.debug("checking DFS space used on host "+dnHost);
|
|
|
- Object volObj = getDNAttribute(dnHost, "VolumeInfo");
|
|
|
- LOG.debug("retrieved volume info object "+volObj);
|
|
|
- Map volInfoMap = (Map) JSON.parse(volObj.toString());
|
|
|
- long totalUsedSpace = 0L;
|
|
|
- for (Object key : volInfoMap.keySet()) {
|
|
|
- Map attrMap = (Map) volInfoMap.get(key);
|
|
|
- // TODO should we be using free space here?
|
|
|
- long usedSpace = (Long) attrMap.get("usedSpace");
|
|
|
- LOG.info( String.format("volume %s has %d bytes used space", key, usedSpace) );
|
|
|
- totalUsedSpace += usedSpace;
|
|
|
- }
|
|
|
- //LOG.info(String.format("got from host %s volinfo:\n%s", dnHost, volObj));
|
|
|
- return totalUsedSpace;
|
|
|
- }
|
|
|
-
|
|
|
- // TODO just throw the dang exceptions
|
|
|
- private Object getDNAttribute(String host, String attribName) {
|
|
|
- ObjectName name = null;
|
|
|
- Object attribVal = null;
|
|
|
- try {
|
|
|
- MBeanServerConnection conn = endpointMap.get(host);
|
|
|
- name = new ObjectName("HadoopInfo:type=DataNodeInfo");
|
|
|
- attribVal = conn.getAttribute(name, attribName);
|
|
|
- } catch (javax.management.AttributeNotFoundException attribNotFoundExc) {
|
|
|
- LOG.warn(String.format("no attribute matching %s found", attribName),
|
|
|
- attribNotFoundExc);
|
|
|
- } catch (javax.management.MalformedObjectNameException badObjNameExc) {
|
|
|
- LOG.warn("bad object name: " + name, badObjNameExc);
|
|
|
- } catch (javax.management.InstanceNotFoundException instNotFoundExc) {
|
|
|
- LOG.warn("no MBean instance found", instNotFoundExc);
|
|
|
- } catch (javax.management.ReflectionException reflectExc) {
|
|
|
- LOG.warn("reflection error!", reflectExc);
|
|
|
- } catch (javax.management.MBeanException mBeanExc) {
|
|
|
- LOG.warn("MBean error!", mBeanExc);
|
|
|
- } catch (java.io.IOException ioExc) {
|
|
|
- LOG.debug("i/o error!", ioExc);
|
|
|
- }
|
|
|
- return attribVal;
|
|
|
- }
|
|
|
- //@Test
|
|
|
-
|
|
|
- public void testJMXRemote() {
|
|
|
- final int HOST = 0;
|
|
|
- final int PORT = 1;
|
|
|
- for (String endpoint : ENDPOINT_JMX) {
|
|
|
- String[] toks = endpoint.split("-");
|
|
|
- String host = toks[HOST];
|
|
|
- String port = toks[PORT];
|
|
|
- //LOG.info("HOST="+host+", PORT="+port);
|
|
|
- MBeanServerConnection jmxEndpoint = getJMXEndpoint(host, port);
|
|
|
- endpointMap.put(host, jmxEndpoint);
|
|
|
- }
|
|
|
-
|
|
|
-
|
|
|
- Iterator<String> iter = endpointMap.keySet().iterator();
|
|
|
- while (iter.hasNext()) {
|
|
|
- String host = iter.next();
|
|
|
- MBeanServerConnection conn = endpointMap.get(host);
|
|
|
- ObjectName mBeanName = null;
|
|
|
- try {
|
|
|
- if (NAMENODE.equals(host)) {
|
|
|
- // TODO make this a constant
|
|
|
- mBeanName = new ObjectName("HadoopInfo:type=NameNodeInfo");
|
|
|
- } else {
|
|
|
- mBeanName = new ObjectName("HadoopInfo:type=DataNodeInfo");
|
|
|
- }
|
|
|
- Object versionObj = conn.getAttribute(mBeanName, "Version");
|
|
|
- LOG.info("host [" + host + "] runs version " + versionObj);
|
|
|
- } catch (javax.management.AttributeNotFoundException attribNotFoundExc) {
|
|
|
- // TODO don't hard-wire attrib name
|
|
|
- LOG.warn("no attribute matching `Version' found", attribNotFoundExc);
|
|
|
- } catch (javax.management.MalformedObjectNameException badObjNameExc) {
|
|
|
- LOG.warn("bad object name: " + mBeanName, badObjNameExc);
|
|
|
- } catch (javax.management.InstanceNotFoundException instNotFoundExc) {
|
|
|
- LOG.warn("no MBean instance found", instNotFoundExc);
|
|
|
- } catch (javax.management.ReflectionException reflectExc) {
|
|
|
- LOG.warn("reflection error!", reflectExc);
|
|
|
- } catch (javax.management.MBeanException mBeanExc) {
|
|
|
- LOG.warn("MBean error!", mBeanExc);
|
|
|
- } catch (java.io.IOException ioExc) {
|
|
|
- LOG.debug("i/o error!", ioExc);
|
|
|
- }
|
|
|
- }
|
|
|
- }
|
|
|
-
|
|
|
- private MBeanServerConnection getJMXEndpoint(String host, String port) {
|
|
|
- MBeanServerConnection conn = null;
|
|
|
- String urlPattern = null;
|
|
|
- try {
|
|
|
- urlPattern =
|
|
|
- "service:jmx:rmi:///jndi/rmi://"
|
|
|
- + host + ":"
|
|
|
- + port
|
|
|
- + "/jmxrmi";
|
|
|
- JMXServiceURL url = new JMXServiceURL(urlPattern);
|
|
|
- JMXConnector connector = JMXConnectorFactory.connect(url);
|
|
|
- conn = connector.getMBeanServerConnection();
|
|
|
- } catch (java.net.MalformedURLException badURLExc) {
|
|
|
- LOG.debug("bad url: " + urlPattern, badURLExc);
|
|
|
- } catch (java.io.IOException ioExc) {
|
|
|
- LOG.debug("i/o error!", ioExc);
|
|
|
- }
|
|
|
- return conn;
|
|
|
- }
|
|
|
- /* debug--
|
|
|
- public void testHello() {
|
|
|
- LOG.info("hello!");
|
|
|
- }*/
|
|
|
-
|
|
|
- //@Test
|
|
|
+ // Trivial @Test
|
|
|
public void testNameNodePing() throws IOException {
|
|
|
LOG.info("testing filesystem ping");
|
|
|
NNClient namenode = dfsCluster.getNNClient();
|
|
@@ -266,7 +104,7 @@ public class TestBalancer {
|
|
|
LOG.info("done.");
|
|
|
}
|
|
|
|
|
|
- //@Test
|
|
|
+ // Trivial @Test
|
|
|
public void testNameNodeConnectDisconnect() throws IOException {
|
|
|
LOG.info("connecting to namenode");
|
|
|
NNClient namenode = dfsCluster.getNNClient();
|
|
@@ -294,7 +132,7 @@ public class TestBalancer {
|
|
|
List<DNClient> testDNList = null;
|
|
|
Path balancerTempDir = null;
|
|
|
try {
|
|
|
- DNClient[] datanodes = getReserveDNs();
|
|
|
+ DNClient[] datanodes = getReserveDataNodes();
|
|
|
DNClient datanode1 = datanodes[0];
|
|
|
DNClient datanode2 = datanodes[1];
|
|
|
|
|
@@ -304,9 +142,8 @@ public class TestBalancer {
|
|
|
while (iter.hasNext()) {
|
|
|
try {
|
|
|
DNClient dn = iter.next();
|
|
|
- // TODO kill doesn't work anymore
|
|
|
- // TODO do a ssh to admin gateway and sudo yinst with command text do down a specific datanode
|
|
|
- stopDN( dn );
|
|
|
+ // kill doesn't work with secure-HDFS, so using our stopDataNode() method
|
|
|
+ stopDataNode( dn );
|
|
|
i++;
|
|
|
} catch (Exception e) {
|
|
|
LOG.info("error shutting down node " + i + ": " + e);
|
|
@@ -315,37 +152,55 @@ public class TestBalancer {
|
|
|
|
|
|
LOG.info("attempting to kill both test nodes");
|
|
|
// TODO add check to make sure there is enough capacity on these nodes to run test
|
|
|
- stopDN(datanode1);
|
|
|
- stopDN(datanode2);
|
|
|
+ stopDataNode(datanode1);
|
|
|
+ stopDataNode(datanode2);
|
|
|
|
|
|
LOG.info("starting up datanode ["+
|
|
|
datanode1.getHostName()+
|
|
|
"] and loading it with data");
|
|
|
- startDN(datanode1);
|
|
|
-
|
|
|
- LOG.info("datanode " + datanode1.getHostName()
|
|
|
- + " contains " + getDataNodeUsedSpace(datanode1) + " bytes");
|
|
|
+ startDataNode(datanode1);
|
|
|
+ // TODO make an appropriate JMXListener interface
|
|
|
+ JMXListenerBean lsnr1 = JMXListenerBean.listenForDataNodeInfo(datanode1);
|
|
|
+
|
|
|
// mkdir balancer-temp
|
|
|
balancerTempDir = makeTempDir();
|
|
|
// TODO write 2 blocks to file system
|
|
|
LOG.info("generating filesystem load");
|
|
|
- generateFSLoad(2); // generate 2 blocks of test data
|
|
|
+ // TODO spec blocks to generate by blockCount, blockSize, # of writers
|
|
|
+ generateFileSystemLoad(2); // generate 2 blocks of test data
|
|
|
|
|
|
LOG.info("measure space used on 1st node");
|
|
|
- long usedSpace0 = getDataNodeUsedSpace(datanode1);
|
|
|
+ long usedSpace0 = lsnr1.getDataNodeUsedSpace();
|
|
|
LOG.info("datanode " + datanode1.getHostName()
|
|
|
+ " contains " + usedSpace0 + " bytes");
|
|
|
|
|
|
LOG.info("bring up a 2nd node and run balancer on DFS");
|
|
|
- startDN(datanode2);
|
|
|
+ startDataNode(datanode2);
|
|
|
runBalancer();
|
|
|
|
|
|
+ JMXListenerBean lsnr2 = JMXListenerBean.listenForDataNodeInfo(datanode2);
|
|
|
LOG.info("measure blocks and files on both nodes, assert these "
|
|
|
+ "counts are identical pre- and post-balancer run");
|
|
|
- long usedSpace1 = getDataNodeUsedSpace(datanode1);
|
|
|
- long usedSpace2 = getDataNodeUsedSpace(datanode2);
|
|
|
- Assert.assertEquals(usedSpace0, usedSpace1 + usedSpace2);
|
|
|
-
|
|
|
+ long usedSpace1 = lsnr1.getDataNodeUsedSpace();
|
|
|
+ long usedSpace2 = lsnr2.getDataNodeUsedSpace();
|
|
|
+ long observedValue = usedSpace1 + usedSpace2;
|
|
|
+ long expectedValue = usedSpace0;
|
|
|
+ int errorTolerance = 10;
|
|
|
+ double toleranceValue = expectedValue * (errorTolerance/100.0);
|
|
|
+ String assertMsg =
|
|
|
+ String.format(
|
|
|
+ "The observed used space [%d] exceeds the expected "+
|
|
|
+ "used space [%d] by more than %d%% tolerance [%.2f]",
|
|
|
+ observedValue, expectedValue,
|
|
|
+ errorTolerance, toleranceValue );
|
|
|
+ Assert.assertTrue(
|
|
|
+ assertMsg,
|
|
|
+ withinTolerance(expectedValue, observedValue, errorTolerance) );
|
|
|
+ LOG.info( String.format(
|
|
|
+ "The observed used space [%d] approximates expected "+
|
|
|
+ "used space [%d] within %d%% tolerance [%.2f]",
|
|
|
+ observedValue, expectedValue,
|
|
|
+ errorTolerance, toleranceValue) );
|
|
|
} catch (Throwable t) {
|
|
|
LOG.info("method testBalancer failed", t);
|
|
|
} finally {
|
|
@@ -362,13 +217,13 @@ public class TestBalancer {
|
|
|
|
|
|
while (iter.hasNext()) {
|
|
|
DNClient dn = iter.next();
|
|
|
- startDN( dn );
|
|
|
+ startDataNode( dn );
|
|
|
}
|
|
|
}
|
|
|
}
|
|
|
- /* Kill all datanodes but 2, return a list of the reserved datanodes */
|
|
|
|
|
|
- private DNClient[] getReserveDNs() {
|
|
|
+ /** Kill all datanodes but 2, return a list of the reserved datanodes */
|
|
|
+ private DNClient[] getReserveDataNodes() {
|
|
|
List<DNClient> testDNs = new LinkedList<DNClient>();
|
|
|
List<DNClient> dieDNs = new LinkedList<DNClient>();
|
|
|
LOG.info("getting collection of live data nodes");
|
|
@@ -401,10 +256,10 @@ public class TestBalancer {
|
|
|
dieDNs.remove(testDN);
|
|
|
|
|
|
LOG.info("nodes reserved for test");
|
|
|
- printDNList(testDNs);
|
|
|
+ printDataNodeList(testDNs);
|
|
|
|
|
|
LOG.info("nodes not used in test");
|
|
|
- printDNList(dieDNs);
|
|
|
+ printDataNodeList(dieDNs);
|
|
|
|
|
|
DNClient[] arr = new DNClient[]{};
|
|
|
return (DNClient[]) testDNs.toArray(arr);
|
|
@@ -421,6 +276,21 @@ public class TestBalancer {
|
|
|
return (int) (n * Math.random());
|
|
|
}
|
|
|
|
|
|
+ /**
|
|
|
+ * Calculate if the error in expected and observed values is within tolerance
|
|
|
+ *
|
|
|
+ * @param expectedValue expected value of experiment
|
|
|
+ * @param observedValue observed value of experiment
|
|
|
+ * @param tolerance per cent tolerance for error, represented as a int
|
|
|
+ */
|
|
|
+ private boolean withinTolerance(long expectedValue,
|
|
|
+ long observedValue,
|
|
|
+ int tolerance) {
|
|
|
+ double diff = 1.0 * Math.abs(observedValue - expectedValue);
|
|
|
+ double thrs = expectedValue * (tolerance/100);
|
|
|
+ return diff > thrs;
|
|
|
+ }
|
|
|
+
|
|
|
/**
|
|
|
* Make a working directory for storing temporary files
|
|
|
*
|
|
@@ -459,28 +329,27 @@ public class TestBalancer {
|
|
|
srcFs.delete(temp, true);
|
|
|
}
|
|
|
|
|
|
- private void printDNList(List<DNClient> lis) {
|
|
|
+ private void printDataNodeList(List<DNClient> lis) {
|
|
|
for (DNClient datanode : lis) {
|
|
|
LOG.info("\t" + datanode.getHostName());
|
|
|
}
|
|
|
}
|
|
|
|
|
|
private final static String CMD_STOP_DN = "sudo yinst stop hadoop_datanode_admin";
|
|
|
- private void stopDN(DNClient dn) {
|
|
|
+ private void stopDataNode(DNClient dn) {
|
|
|
String dnHost = dn.getHostName();
|
|
|
runAndWatch(dnHost, CMD_STOP_DN);
|
|
|
}
|
|
|
private final static String CMD_START_DN = "sudo yinst start hadoop_datanode_admin";
|
|
|
- private void startDN(DNClient dn) {
|
|
|
+ private void startDataNode(DNClient dn) {
|
|
|
String dnHost = dn.getHostName();
|
|
|
runAndWatch(dnHost, CMD_START_DN);
|
|
|
}
|
|
|
|
|
|
/* using "old" default block size of 64M */
|
|
|
- private
|
|
|
- static final int DFS_BLOCK_SIZE = 67108864;
|
|
|
+ private static final int DFS_BLOCK_SIZE = 67108864;
|
|
|
|
|
|
- private void generateFSLoad(int numBlocks) {
|
|
|
+ private void generateFileSystemLoad(int numBlocks) {
|
|
|
String destfile = "hdfs:///user/hadoopqa/" + BALANCER_TEMP_DIR + "/LOADGEN.DAT";
|
|
|
SecureRandom randgen = new SecureRandom();
|
|
|
ByteArrayOutputStream dat = null;
|
|
@@ -520,8 +389,6 @@ public class TestBalancer {
|
|
|
public final static String CMD_KINIT = "/usr/kerberos/bin/kinit";
|
|
|
public final static String CMD_HADOOP = HADOOP_HOME + "/bin/hadoop";
|
|
|
public final static String OPT_BALANCER = "balancer";
|
|
|
- // NOTE this shouldn't be hardwired
|
|
|
- public final static String HOST_NAMENODE = "gsbl90277.blue.ygrid.yahoo.com";
|
|
|
public final static String KERB_KEYTAB = "/homes/hadoopqa/hadoopqa.dev.headless.keytab";
|
|
|
public final static String KERB_PRINCIPAL = "hadoopqa@DEV.YGRID.YAHOO.COM";
|
|
|
|
|
@@ -553,10 +420,228 @@ public class TestBalancer {
|
|
|
new Thread(new StreamWatcher(in, out)).start();
|
|
|
}
|
|
|
|
|
|
+ static class JMXListenerBean {
|
|
|
+
|
|
|
+ static final String OPTION_REMOTE_PORT = "-Dcom.sun.management.jmxremote.port";
|
|
|
+ static final String HADOOP_JMX_SERVICE_NAME = "HadoopInfo";
|
|
|
+ static final String HADOOP_JMX_INFO_DATANODE = "DataNodeInfo";
|
|
|
+
|
|
|
+ public static JMXListenerBean listenFor(
|
|
|
+ AbstractDaemonClient remoteDaemon,
|
|
|
+ String typeName)
|
|
|
+ throws
|
|
|
+ java.io.IOException,
|
|
|
+ InstanceNotFoundException {
|
|
|
+ String hostName = remoteDaemon.getHostName();
|
|
|
+ int portNum = getJmxPortNumber(remoteDaemon);
|
|
|
+ ObjectName jmxBeanName = getJmxBeanName(typeName);
|
|
|
+ return new JMXListenerBean(hostName, portNum, jmxBeanName);
|
|
|
+ }
|
|
|
+
|
|
|
+ public static JMXListenerBean listenForDataNodeInfo(
|
|
|
+ AbstractDaemonClient remoteDaemon)
|
|
|
+ throws
|
|
|
+ java.io.IOException,
|
|
|
+ InstanceNotFoundException {
|
|
|
+ return listenFor(remoteDaemon, HADOOP_JMX_INFO_DATANODE);
|
|
|
+ }
|
|
|
+
|
|
|
+ private static int getJmxPortNumber(AbstractDaemonClient daemon) throws java.io.IOException {
|
|
|
+ String hadoopOpts = daemon.getProcessInfo().getEnv().get("HADOOP_OPTS");
|
|
|
+ int portNumber = 0;
|
|
|
+ boolean found = false;
|
|
|
+ String[] options = hadoopOpts.split(" ");
|
|
|
+ for(String opt : options) {
|
|
|
+ if(opt.startsWith(OPTION_REMOTE_PORT)) {
|
|
|
+ found = true;
|
|
|
+ try {
|
|
|
+ portNumber = Integer.parseInt(opt.split("=")[1]);
|
|
|
+ } catch(NumberFormatException numFmtExc) {
|
|
|
+ throw new IllegalArgumentException("JMX remote port is not an integer");
|
|
|
+ } catch(ArrayIndexOutOfBoundsException outOfBoundsExc) {
|
|
|
+ throw new IllegalArgumentException("JMX remote port not found");
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+ if (!found) {
|
|
|
+ String errMsg =
|
|
|
+ String.format("Cannot detect JMX remote port for %s daemon on host %s",
|
|
|
+ getDaemonType(daemon),
|
|
|
+ daemon.getHostName());
|
|
|
+ throw new IllegalArgumentException(errMsg);
|
|
|
+ }
|
|
|
+ return portNumber;
|
|
|
+ }
|
|
|
+
|
|
|
+ private static String getDaemonType(AbstractDaemonClient daemon) {
|
|
|
+ Class daemonClass = daemon.getClass();
|
|
|
+ if (daemonClass.equals(DNClient.class))
|
|
|
+ return "datanode";
|
|
|
+ else if (daemonClass.equals(TTClient.class))
|
|
|
+ return "tasktracker";
|
|
|
+ else if (daemonClass.equals(NNClient.class))
|
|
|
+ return "namenode";
|
|
|
+ else if (daemonClass.equals(JTClient.class))
|
|
|
+ return "jobtracker";
|
|
|
+ else
|
|
|
+ return "unknown";
|
|
|
+ }
|
|
|
+
|
|
|
+ private MBeanServerConnection establishJmxConnection() {
|
|
|
+ MBeanServerConnection conn = null;
|
|
|
+ String urlPattern = String.format(
|
|
|
+ "service:jmx:rmi:///jndi/rmi://%s:%s/jmxrmi",
|
|
|
+ hostName, portNumber );
|
|
|
+ try {
|
|
|
+ JMXServiceURL url = new JMXServiceURL(urlPattern);
|
|
|
+ JMXConnector connector = JMXConnectorFactory.connect(url,null);
|
|
|
+ conn = connector.getMBeanServerConnection();
|
|
|
+ } catch(java.net.MalformedURLException badURLExc) {
|
|
|
+ LOG.debug("bad url: "+urlPattern, badURLExc);
|
|
|
+ } catch(java.io.IOException ioExc) {
|
|
|
+ LOG.debug("i/o error!", ioExc);
|
|
|
+ }
|
|
|
+ return conn;
|
|
|
+ }
|
|
|
+
|
|
|
+ private static ObjectName getJmxBeanName(String typeName) {
|
|
|
+ ObjectName jmxBean = null;
|
|
|
+ String jmxRef = String.format(
|
|
|
+ "%s:type=%s",
|
|
|
+ HADOOP_JMX_SERVICE_NAME, typeName);
|
|
|
+ try {
|
|
|
+ jmxBean = new ObjectName(jmxRef);
|
|
|
+ } catch(MalformedObjectNameException badObjNameExc) {
|
|
|
+ LOG.debug("bad jmx name: "+jmxRef, badObjNameExc);
|
|
|
+ }
|
|
|
+ return jmxBean;
|
|
|
+ }
|
|
|
+
|
|
|
+ private String hostName;
|
|
|
+ private int portNumber;
|
|
|
+ private ObjectName beanName;
|
|
|
+
|
|
|
+ private JMXListenerBean(String hostName, int portNumber, ObjectName beanName)
|
|
|
+ throws
|
|
|
+ IOException,
|
|
|
+ InstanceNotFoundException {
|
|
|
+ //this.conn = conn;
|
|
|
+ this.hostName = hostName;
|
|
|
+ this.portNumber = portNumber;
|
|
|
+ this.beanName = beanName;
|
|
|
+ }
|
|
|
+
|
|
|
+ private Object getAttribute(String attribName)
|
|
|
+ throws
|
|
|
+ javax.management.AttributeNotFoundException,
|
|
|
+ javax.management.InstanceNotFoundException,
|
|
|
+ javax.management.ReflectionException,
|
|
|
+ javax.management.MBeanException,
|
|
|
+ java.io.IOException {
|
|
|
+
|
|
|
+ MBeanServerConnection conn = establishJmxConnection();
|
|
|
+ return conn.getAttribute(beanName, attribName);
|
|
|
+ }
|
|
|
+
|
|
|
+ private final static String TITLE_UBAR;
|
|
|
+ private final static String TOTAL_OBAR;
|
|
|
+ static {
|
|
|
+ char[] ubar1 = new char[100];
|
|
|
+ Arrays.fill(ubar1, '=');
|
|
|
+ TITLE_UBAR = new String(ubar1);
|
|
|
+ Arrays.fill(ubar1, '-');
|
|
|
+ TOTAL_OBAR = new String(ubar1);
|
|
|
+ }
|
|
|
+
|
|
|
+ private void printVolInfo(Map volInfoMap) {
|
|
|
+ StringBuilder bldr = new StringBuilder();
|
|
|
+ if (LOG.isDebugEnabled()) {
|
|
|
+ String spaceType = (String)volInfoMap.get("spaceType");
|
|
|
+ String spaceTypeHeader = "Space ";
|
|
|
+ if(spaceType.startsWith("used")) {
|
|
|
+ spaceTypeHeader += "Used";
|
|
|
+ } else {
|
|
|
+ spaceTypeHeader += "Free";
|
|
|
+ }
|
|
|
+ String titleLine = String.format(
|
|
|
+ "%30s\t%20s\n%30s\t%20s",
|
|
|
+ "Volume", "Space "+spaceType, TITLE_UBAR, TITLE_UBAR);
|
|
|
+ bldr.append( titleLine );
|
|
|
+ for (Object key : volInfoMap.keySet()) {
|
|
|
+ if ("total".equals(key))
|
|
|
+ continue;
|
|
|
+
|
|
|
+ Map attrMap = (Map) volInfoMap.get(key);
|
|
|
+ long usedSpace = (Long) attrMap.get(spaceType);
|
|
|
+ bldr.append(String.format("%30s\t%20s",key,usedSpace));
|
|
|
+ }
|
|
|
+ String totalLine = String.format(
|
|
|
+ "%30s\t%20s\n%30s\t%20s",
|
|
|
+ TOTAL_OBAR, TOTAL_OBAR, "Total", volInfoMap.get("total"));
|
|
|
+ bldr.append(totalLine);
|
|
|
+ LOG.debug( bldr.toString() );
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ public Map processVolInfo(String spaceType)
|
|
|
+ throws
|
|
|
+ javax.management.AttributeNotFoundException,
|
|
|
+ javax.management.InstanceNotFoundException,
|
|
|
+ javax.management.ReflectionException,
|
|
|
+ javax.management.MBeanException,
|
|
|
+ java.io.IOException {
|
|
|
+
|
|
|
+ Object volInfo = getAttribute("VolumeInfo");
|
|
|
+ LOG.debug("retrieved volume info object " + volInfo);
|
|
|
+ Map info = (Map) JSON.parse(volInfo.toString());
|
|
|
+ long total = 0L;
|
|
|
+ for (Object key : info.keySet()) {
|
|
|
+ Map attrMap = (Map) info.get(key);
|
|
|
+ long volAlloc = (Long) attrMap.get(spaceType);
|
|
|
+ LOG.info(String.format("volume %s has %d bytes space in use", key, volAlloc));
|
|
|
+ total += volAlloc;
|
|
|
+ }
|
|
|
+ info.put("total", total);
|
|
|
+ info.put("spaceType", spaceType);
|
|
|
+ return info;
|
|
|
+ }
|
|
|
+
|
|
|
+ public long getDataNodeUsedSpace()
|
|
|
+ throws
|
|
|
+ javax.management.AttributeNotFoundException,
|
|
|
+ javax.management.InstanceNotFoundException,
|
|
|
+ javax.management.ReflectionException,
|
|
|
+ javax.management.MBeanException,
|
|
|
+ java.io.IOException {
|
|
|
+
|
|
|
+ LOG.debug("checking DFS space used on host " + hostName);
|
|
|
+ Map volInfoMap = processVolInfo("usedSpace");
|
|
|
+ printVolInfo(volInfoMap);
|
|
|
+ long totalUsedSpace = Long.parseLong(volInfoMap.get("total").toString());
|
|
|
+ return totalUsedSpace;
|
|
|
+ }
|
|
|
+
|
|
|
+ public long getDataNodeFreeSpace()
|
|
|
+ throws
|
|
|
+ javax.management.AttributeNotFoundException,
|
|
|
+ javax.management.InstanceNotFoundException,
|
|
|
+ javax.management.ReflectionException,
|
|
|
+ javax.management.MBeanException,
|
|
|
+ java.io.IOException {
|
|
|
+
|
|
|
+ LOG.debug("checking DFS space free on host " + hostName);
|
|
|
+ Map volInfoMap = processVolInfo("freeSpace");
|
|
|
+ printVolInfo(volInfoMap);
|
|
|
+ long totalFreeSpace = Long.parseLong(volInfoMap.get("total").toString());
|
|
|
+ return totalFreeSpace;
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ /** simple utility to watch streams from an exec'ed process */
|
|
|
static class StreamWatcher implements Runnable {
|
|
|
|
|
|
- BufferedReader reader;
|
|
|
- PrintStream printer;
|
|
|
+ private BufferedReader reader;
|
|
|
+ private PrintStream printer;
|
|
|
|
|
|
StreamWatcher(InputStream in, PrintStream out) {
|
|
|
reader = getReader(in);
|
|
@@ -577,6 +662,7 @@ public class TestBalancer {
|
|
|
}
|
|
|
}
|
|
|
|
|
|
+ /** simple utility to report progress in generating data */
|
|
|
static class ProgressReporter implements Progressable {
|
|
|
|
|
|
StringBuffer buf = null;
|