|
@@ -27,22 +27,21 @@ import java.io.InputStreamReader;
|
|
|
import java.io.OutputStream;
|
|
|
import java.io.PrintStream;
|
|
|
import java.net.URI;
|
|
|
+
|
|
|
import java.security.SecureRandom;
|
|
|
-import java.util.Arrays;
|
|
|
+
|
|
|
+import java.util.HashMap;
|
|
|
+import java.util.HashSet;
|
|
|
import java.util.Iterator;
|
|
|
import java.util.LinkedList;
|
|
|
import java.util.List;
|
|
|
import java.util.Map;
|
|
|
-import javax.management.InstanceNotFoundException;
|
|
|
-import javax.management.MBeanServerConnection;
|
|
|
-import javax.management.MalformedObjectNameException;
|
|
|
-import javax.management.ObjectName;
|
|
|
-import javax.management.remote.JMXConnector;
|
|
|
-import javax.management.remote.JMXConnectorFactory;
|
|
|
-import javax.management.remote.JMXServiceURL;
|
|
|
+import java.util.Set;
|
|
|
+
|
|
|
+import org.apache.commons.lang.ArrayUtils;
|
|
|
|
|
|
import org.apache.commons.logging.Log;
|
|
|
-import org.apache.commons.logging.LogFactory;
|
|
|
+import org.apache.commons.logging.LogFactory;;
|
|
|
|
|
|
import org.apache.hadoop.conf.Configuration;
|
|
|
import org.apache.hadoop.fs.FileStatus;
|
|
@@ -53,13 +52,8 @@ import org.apache.hadoop.hdfs.test.system.HDFSCluster;
|
|
|
import org.apache.hadoop.hdfs.test.system.NNClient;
|
|
|
import org.apache.hadoop.hdfs.test.system.DNClient;
|
|
|
|
|
|
-import org.apache.hadoop.mapreduce.test.system.MRCluster;
|
|
|
|
|
|
import org.apache.hadoop.io.IOUtils;
|
|
|
-
|
|
|
-import org.apache.hadoop.mapreduce.test.system.JTClient;
|
|
|
-import org.apache.hadoop.mapreduce.test.system.TTClient;
|
|
|
-import org.apache.hadoop.test.system.AbstractDaemonClient;
|
|
|
import org.apache.hadoop.util.Progressable;
|
|
|
|
|
|
import org.junit.After;
|
|
@@ -74,7 +68,6 @@ public class TestBalancer {
|
|
|
private static final String BALANCER_TEMP_DIR = "balancer-temp";
|
|
|
private Configuration hadoopConf;
|
|
|
private HDFSCluster dfsCluster;
|
|
|
- private MRCluster mrCluster;
|
|
|
|
|
|
public TestBalancer() throws Exception {
|
|
|
}
|
|
@@ -84,16 +77,11 @@ public class TestBalancer {
|
|
|
hadoopConf = new Configuration();
|
|
|
dfsCluster = HDFSCluster.createCluster(hadoopConf);
|
|
|
dfsCluster.setUp();
|
|
|
- //TODO no need for mr cluster anymore
|
|
|
- mrCluster = MRCluster.createCluster(hadoopConf);
|
|
|
- mrCluster.setUp();
|
|
|
- //connectJMX();
|
|
|
}
|
|
|
|
|
|
@After
|
|
|
public void tearDown() throws Exception {
|
|
|
dfsCluster.tearDown();
|
|
|
- mrCluster.tearDown();
|
|
|
}
|
|
|
|
|
|
// Trivial @Test
|
|
@@ -128,78 +116,37 @@ public class TestBalancer {
|
|
|
*/
|
|
|
@Test
|
|
|
public void testBalancerBasicScenario() throws IOException {
|
|
|
- List<DNClient> killDNList = null;
|
|
|
- List<DNClient> testDNList = null;
|
|
|
Path balancerTempDir = null;
|
|
|
try {
|
|
|
- DNClient[] datanodes = getReserveDatanodes();
|
|
|
- DNClient datanode1 = datanodes[0];
|
|
|
- DNClient datanode2 = datanodes[1];
|
|
|
-
|
|
|
- LOG.info("attempting to kill/suspend all the nodes not used for this test");
|
|
|
- Iterator<DNClient> iter = dfsCluster.getDNClients().iterator();
|
|
|
- int i = 0;
|
|
|
- while (iter.hasNext()) {
|
|
|
- try {
|
|
|
- DNClient dn = iter.next();
|
|
|
- // kill doesn't work with secure-HDFS, so using our stopDataNode() method
|
|
|
- stopDatanode( dn );
|
|
|
- i++;
|
|
|
- } catch (Exception e) {
|
|
|
- LOG.info("error shutting down node " + i + ": " + e);
|
|
|
- }
|
|
|
- }
|
|
|
+ List<DNClient> testnodes = reserveDatanodesForTest(2);
|
|
|
+ DNClient testnode1 = testnodes.get(0);
|
|
|
+ DNClient testnode2 = testnodes.get(1);
|
|
|
+ shutdownNonTestNodes(testnodes);
|
|
|
|
|
|
LOG.info("attempting to kill both test nodes");
|
|
|
- // TODO add check to make sure there is enough capacity on these nodes to run test
|
|
|
- stopDatanode(datanode1);
|
|
|
- stopDatanode(datanode2);
|
|
|
+ stopDatanode(testnode1);
|
|
|
+ stopDatanode(testnode2);
|
|
|
|
|
|
LOG.info("starting up datanode ["+
|
|
|
- datanode1.getHostName()+
|
|
|
+ testnode1.getHostName()+
|
|
|
"] and loading it with data");
|
|
|
- startDatanode(datanode1);
|
|
|
+ startDatanode(testnode1);
|
|
|
|
|
|
// mkdir balancer-temp
|
|
|
balancerTempDir = makeTempDir();
|
|
|
- // TODO write 2 blocks to file system
|
|
|
+ // write 2 blocks to file system
|
|
|
LOG.info("generating filesystem load");
|
|
|
// TODO spec blocks to generate by blockCount, blockSize, # of writers
|
|
|
generateFileSystemLoad(2); // generate 2 blocks of test data
|
|
|
|
|
|
LOG.info("measure space used on 1st node");
|
|
|
- long usedSpace0 = getDatanodeUsedSpace(datanode1);
|
|
|
- LOG.info("datanode " + datanode1.getHostName()
|
|
|
+ long usedSpace0 = getDatanodeUsedSpace(testnode1);
|
|
|
+ LOG.info("datanode " + testnode1.getHostName()
|
|
|
+ " contains " + usedSpace0 + " bytes");
|
|
|
|
|
|
LOG.info("bring up a 2nd node and run balancer on DFS");
|
|
|
- startDatanode(datanode2);
|
|
|
- runBalancer();
|
|
|
-
|
|
|
- //JMXListenerBean lsnr2 = JMXListenerBean.listenForDataNodeInfo(datanode2);
|
|
|
-
|
|
|
- LOG.info("measure blocks and files on both nodes, assert these "
|
|
|
- + "counts are identical pre- and post-balancer run");
|
|
|
- long usedSpace1 = getDatanodeUsedSpace(datanode1);
|
|
|
- long usedSpace2 = getDatanodeUsedSpace(datanode2);
|
|
|
- long observedValue = usedSpace1 + usedSpace2;
|
|
|
- long expectedValue = usedSpace0;
|
|
|
- int errorTolerance = 10;
|
|
|
- double toleranceValue = expectedValue * (errorTolerance/100.0);
|
|
|
- String assertMsg =
|
|
|
- String.format(
|
|
|
- "The observed used space [%d] exceeds the expected "+
|
|
|
- "used space [%d] by more than %d%% tolerance [%.2f]",
|
|
|
- observedValue, expectedValue,
|
|
|
- errorTolerance, toleranceValue );
|
|
|
- Assert.assertTrue(
|
|
|
- assertMsg,
|
|
|
- withinTolerance(expectedValue, observedValue, errorTolerance) );
|
|
|
- LOG.info( String.format(
|
|
|
- "The observed used space [%d] approximates expected "+
|
|
|
- "used space [%d] within %d%% tolerance [%.2f]",
|
|
|
- observedValue, expectedValue,
|
|
|
- errorTolerance, toleranceValue) );
|
|
|
+ startDatanode(testnode2);
|
|
|
+ runBalancerAndVerify(testnodes);
|
|
|
} catch (Throwable t) {
|
|
|
LOG.info("method testBalancer failed", t);
|
|
|
} finally {
|
|
@@ -221,23 +168,38 @@ public class TestBalancer {
|
|
|
}
|
|
|
}
|
|
|
|
|
|
- /** Kill all datanodes but 2, return a list of the reserved datanodes */
|
|
|
- private DNClient[] getReserveDatanodes() {
|
|
|
+ private void shutdownNonTestNodes(List<DNClient> testnodes) {
|
|
|
+ Set killSet = new HashSet(getAllDatanodes());
|
|
|
+ killSet.removeAll(testnodes);
|
|
|
+ LOG.info("attempting to kill/suspend all the nodes not used for this test");
|
|
|
+ Iterator<DNClient> iter = killSet.iterator();
|
|
|
+ DNClient dn = null;
|
|
|
+ while (iter.hasNext()) {
|
|
|
+ dn = iter.next();
|
|
|
+ // kill may not work with some secure-HDFS configs,
|
|
|
+ // so using our stopDataNode() method
|
|
|
+ stopDatanode(dn);
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * Kill all datanodes but leave reservationCount nodes alive,
|
|
|
+ * return a list of the reserved datanodes
|
|
|
+ */
|
|
|
+ private List<DNClient> reserveDatanodesForTest(int reservationCount) {
|
|
|
List<DNClient> testDNs = new LinkedList<DNClient>();
|
|
|
List<DNClient> dieDNs = new LinkedList<DNClient>();
|
|
|
LOG.info("getting collection of live data nodes");
|
|
|
- NNClient namenode = dfsCluster.getNNClient();
|
|
|
- List<DNClient> dnList = dfsCluster.getDNClients();
|
|
|
+ List<DNClient> dnList = getAllDatanodes();
|
|
|
int dnCount = dnList.size();
|
|
|
- if (dnList.size() < 2) {
|
|
|
- // TODO throw a non-RuntimeException here instead
|
|
|
- String msg = String.format(
|
|
|
+ // check to make sure there is enough capacity on these nodes to run test
|
|
|
+ Assert.assertTrue(
|
|
|
+ String.format(
|
|
|
"not enough datanodes available to run test,"
|
|
|
- + " need 2 datanodes but have only %d available",
|
|
|
- dnCount);
|
|
|
- throw new RuntimeException(msg);
|
|
|
- }
|
|
|
- LOG.info("selecting 2 nodes for test");
|
|
|
+ + " need %d datanodes but have only %d available",
|
|
|
+ reservationCount, dnCount),
|
|
|
+ ( dnCount >= reservationCount ));
|
|
|
+ LOG.info("selecting "+reservationCount+" nodes for test");
|
|
|
dieDNs = new LinkedList<DNClient>(dnList);
|
|
|
testDNs = new LinkedList<DNClient>();
|
|
|
|
|
@@ -260,8 +222,16 @@ public class TestBalancer {
|
|
|
LOG.info("nodes not used in test");
|
|
|
printDatanodeList(dieDNs);
|
|
|
|
|
|
- DNClient[] arr = new DNClient[]{};
|
|
|
- return (DNClient[]) testDNs.toArray(arr);
|
|
|
+ return testDNs;
|
|
|
+ }
|
|
|
+
|
|
|
+ private List<DNClient> getAllDatanodes() {
|
|
|
+ return dfsCluster.getDNClients();
|
|
|
+ }
|
|
|
+
|
|
|
+ private final static DNClient[] DATANODE_ARRAY = {};
|
|
|
+ private DNClient[] toDatanodeArray(List<DNClient> datanodeList) {
|
|
|
+ return (DNClient[]) datanodeList.toArray(DATANODE_ARRAY);
|
|
|
}
|
|
|
|
|
|
/**
|
|
@@ -290,6 +260,45 @@ public class TestBalancer {
|
|
|
return diff > thrs;
|
|
|
}
|
|
|
|
|
|
+ // emulate tolerance calculation in balancer code
|
|
|
+ public final static int DEFAULT_TOLERANCE = 10; // 10%
|
|
|
+ protected boolean isClusterBalanced(DNClient[] datanodes) throws IOException {
|
|
|
+ return isClusterBalanced(datanodes, DEFAULT_TOLERANCE);
|
|
|
+ }
|
|
|
+ protected boolean isClusterBalanced(DNClient[] datanodes, int tolerance)
|
|
|
+ throws IOException {
|
|
|
+
|
|
|
+ Assert.assertFalse("empty datanode array specified",
|
|
|
+ ArrayUtils.isEmpty(datanodes));
|
|
|
+ boolean result = true;
|
|
|
+ double[] utilizationByNode = new double[ datanodes.length ];
|
|
|
+ double totalUsedSpace = 0L;
|
|
|
+ double totalCapacity = 0L;
|
|
|
+ Map datanodeVolumeMap = new HashMap();
|
|
|
+ // accumulate space stored on each node
|
|
|
+ for(int i=0; i<datanodes.length; i++) {
|
|
|
+ DNClient datanode = datanodes[i];
|
|
|
+ Map volumeInfoMap = getDatanodeVolumeAttributes(datanode);
|
|
|
+ long usedSpace = (Long)volumeInfoMap.get(ATTRNAME_USED_SPACE);
|
|
|
+ long capacity = (Long)volumeInfoMap.get(ATTRNAME_CAPACITY );
|
|
|
+ utilizationByNode[i] = ( ((double)usedSpace)/capacity ) * 100;
|
|
|
+ totalUsedSpace += usedSpace;
|
|
|
+ totalCapacity += capacity;
|
|
|
+ }
|
|
|
+ // here we are reusing previously fetched volume-info, for speed
|
|
|
+ // an alternative is to get fresh values from the cluster here instead
|
|
|
+ double avgUtilization = ( totalUsedSpace/totalCapacity ) * 100;
|
|
|
+ for(int i=0; i<datanodes.length; i++) {
|
|
|
+ double varUtilization = Math.abs(avgUtilization - utilizationByNode[i]);
|
|
|
+ if(varUtilization > tolerance) {
|
|
|
+ result = false;
|
|
|
+ break;
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ return result;
|
|
|
+ }
|
|
|
+
|
|
|
/**
|
|
|
* Make a working directory for storing temporary files
|
|
|
*
|
|
@@ -347,17 +356,26 @@ public class TestBalancer {
|
|
|
|
|
|
/* using "old" default block size of 64M */
|
|
|
private static final int DFS_BLOCK_SIZE = 67108864;
|
|
|
-
|
|
|
- private void generateFileSystemLoad(int numBlocks) {
|
|
|
- String destfile = "hdfs:///user/hadoopqa/" + BALANCER_TEMP_DIR + "/LOADGEN.DAT";
|
|
|
+ private static final short DEFAULT_REPLICATION = 3;
|
|
|
+ private void generateFileSystemLoad(long numBlocks) {
|
|
|
+ generateFileSystemLoad(numBlocks, DEFAULT_REPLICATION);
|
|
|
+ }
|
|
|
+ private void generateFileSystemLoad(long numBlocks, short replication) {
|
|
|
+ String destfile = "hdfs:///user/hadoopqa/";// + BALANCER_TEMP_DIR + "/LOADGEN.DAT";
|
|
|
SecureRandom randgen = new SecureRandom();
|
|
|
ByteArrayOutputStream dat = null;
|
|
|
ByteArrayInputStream in = null;
|
|
|
final int CHUNK = 4096;
|
|
|
+ final Configuration testConf = new Configuration(hadoopConf);
|
|
|
try {
|
|
|
+ testConf.setInt("dfs.replication", replication);
|
|
|
for (int i = 0; i < numBlocks; i++) {
|
|
|
- FileSystem fs = FileSystem.get(URI.create(destfile), hadoopConf);
|
|
|
- OutputStream out = fs.create(new Path(destfile), new ProgressReporter());
|
|
|
+ FileSystem fs = FileSystem.get(
|
|
|
+ URI.create(destfile), testConf);
|
|
|
+ OutputStream out = fs.create(
|
|
|
+ new Path(destfile),
|
|
|
+ replication,
|
|
|
+ new ProgressReporter());
|
|
|
dat = new ByteArrayOutputStream(DFS_BLOCK_SIZE);
|
|
|
for (int z = 0; z < DFS_BLOCK_SIZE; z += CHUNK) {
|
|
|
byte[] bytes = new byte[CHUNK];
|
|
@@ -391,36 +409,99 @@ public class TestBalancer {
|
|
|
public final static String KERB_KEYTAB = "/homes/hadoopqa/hadoopqa.dev.headless.keytab";
|
|
|
public final static String KERB_PRINCIPAL = "hadoopqa@DEV.YGRID.YAHOO.COM";
|
|
|
|
|
|
- private void runBalancer() throws IOException {
|
|
|
- String balancerCommand = String.format("\"%s -k -t %s %s; %s %s",
|
|
|
+ public final static int DEFAULT_THRESHOLD = 10;
|
|
|
+ private int runBalancer() throws IOException {
|
|
|
+ return runBalancer(DEFAULT_THRESHOLD);
|
|
|
+ }
|
|
|
+
|
|
|
+ private int runBalancer(int threshold) throws IOException {
|
|
|
+ return runBalancer(""+threshold);
|
|
|
+ }
|
|
|
+ /*
|
|
|
+ * TODO change the heap size balancer uses so it can run on gateways
|
|
|
+ * i.e., 14G heap is too big for gateways
|
|
|
+ */
|
|
|
+ private int runBalancer(String threshold)
|
|
|
+ throws IOException {
|
|
|
+
|
|
|
+ String balancerCommand = String.format("\"%s -k -t %s %s; %s %s -threshold %s",
|
|
|
CMD_KINIT,
|
|
|
KERB_KEYTAB,
|
|
|
KERB_PRINCIPAL,
|
|
|
CMD_HADOOP,
|
|
|
- OPT_BALANCER);
|
|
|
+ OPT_BALANCER,
|
|
|
+ threshold);
|
|
|
String nnHost = dfsCluster.getNNClient().getHostName();
|
|
|
- runAndWatch(nnHost, balancerCommand);
|
|
|
+ return runAndWatch(nnHost, balancerCommand);
|
|
|
+ }
|
|
|
+ private void runBalancerAndVerify(List<DNClient> testnodes)
|
|
|
+ throws IOException {
|
|
|
+ runBalancerAndVerify(testnodes, DEFAULT_THRESHOLD);
|
|
|
+ }
|
|
|
+ private void runBalancerAndVerify(List<DNClient> testnodes, int threshold)
|
|
|
+ throws IOException {
|
|
|
+ runBalancerAndVerify(testnodes, ""+DEFAULT_THRESHOLD);
|
|
|
+ }
|
|
|
+ private void runBalancerAndVerify(List<DNClient> testnodes, String threshold)
|
|
|
+ throws IOException {
|
|
|
+ int exitStatus = runBalancer(threshold);
|
|
|
+ // assert balancer exits with status SUCCESSe
|
|
|
+ Assert.assertTrue(
|
|
|
+ String.format("balancer returned non-success exit code: %d",
|
|
|
+ exitStatus),
|
|
|
+ (exitStatus == SUCCESS));
|
|
|
+ DNClient[] testnodeArr = toDatanodeArray(testnodes);
|
|
|
+ Assert.assertTrue(
|
|
|
+ "cluster is not balanced",
|
|
|
+ isClusterBalanced(testnodeArr));
|
|
|
}
|
|
|
|
|
|
- private void runAndWatch(String remoteHost, String remoteCommand) {
|
|
|
+ private int runAndWatch(String remoteHost, String remoteCommand) {
|
|
|
+ int exitStatus = -1;
|
|
|
try {
|
|
|
Process proc = new ProcessBuilder(CMD_SSH, remoteHost, remoteCommand).start();
|
|
|
watchProcStream(proc.getInputStream(), System.out);
|
|
|
watchProcStream(proc.getErrorStream(), System.err);
|
|
|
- int exitVal = proc.waitFor();
|
|
|
+ exitStatus = proc.waitFor();
|
|
|
} catch(InterruptedException intExc) {
|
|
|
LOG.warn("got thread interrupt error", intExc);
|
|
|
} catch(IOException ioExc) {
|
|
|
LOG.warn("got i/o error", ioExc);
|
|
|
}
|
|
|
+ return exitStatus;
|
|
|
}
|
|
|
|
|
|
private void watchProcStream(InputStream in, PrintStream out) {
|
|
|
new Thread(new StreamWatcher(in, out)).start();
|
|
|
}
|
|
|
private static final String DATANODE_VOLUME_INFO = "VolumeInfo";
|
|
|
- private static final String ATTRNAME_USED_SPACE="usedSpace";
|
|
|
- private long getDatanodeUsedSpace(DNClient datanode) throws IOException {
|
|
|
+ private static final String ATTRNAME_USED_SPACE = "usedSpace";
|
|
|
+ private static final String ATTRNAME_FREE_SPACE = "freeSpace";
|
|
|
+ // pseudo attribute, JMX doesn't really provide this
|
|
|
+ private static final String ATTRNAME_CAPACITY = "capacity";
|
|
|
+ // TODO maybe the static methods below belong in some utility class...
|
|
|
+ private static long getDatanodeUsedSpace(DNClient datanode)
|
|
|
+ throws IOException {
|
|
|
+ return (Long)getDatanodeVolumeAttributes(datanode).get(ATTRNAME_USED_SPACE);
|
|
|
+ }/*
|
|
|
+ private static long getDatanodeFreeSpace(DNClient datanode)
|
|
|
+ throws IOException {
|
|
|
+ return (Long)getDatanodeVolumeAttributes(datanode).get(ATTRNAME_FREE_SPACE);
|
|
|
+ }*/
|
|
|
+ private static Map getDatanodeVolumeAttributes(DNClient datanode)
|
|
|
+ throws IOException {
|
|
|
+ Map result = new HashMap();
|
|
|
+ long usedSpace = getVolumeAttribute(datanode, ATTRNAME_USED_SPACE);
|
|
|
+ long freeSpace = getVolumeAttribute(datanode, ATTRNAME_FREE_SPACE);
|
|
|
+ result.put(ATTRNAME_USED_SPACE, usedSpace);
|
|
|
+ result.put(ATTRNAME_CAPACITY, usedSpace+freeSpace);
|
|
|
+ return result;
|
|
|
+ }
|
|
|
+
|
|
|
+ private static long getVolumeAttribute(DNClient datanode,
|
|
|
+ String attribName)
|
|
|
+ throws IOException {
|
|
|
+
|
|
|
Object volInfo = datanode.getDaemonAttribute(DATANODE_VOLUME_INFO);
|
|
|
Assert
|
|
|
.assertNotNull( String
|
|
@@ -432,15 +513,15 @@ public class TestBalancer {
|
|
|
DATANODE_VOLUME_INFO,
|
|
|
strVolInfo) );
|
|
|
Map volInfoMap = (Map) JSON.parse(strVolInfo);
|
|
|
- long totalUsedSpace = 0L;
|
|
|
+ long attrVal = 0L;
|
|
|
for(Object key: volInfoMap.keySet()) {
|
|
|
Map attrMap = (Map) volInfoMap.get(key);
|
|
|
- long usedSpace = (Long) attrMap.get(ATTRNAME_USED_SPACE);
|
|
|
- totalUsedSpace += usedSpace;
|
|
|
+ long val = (Long) attrMap.get(attribName);
|
|
|
+ attrVal += val;
|
|
|
}
|
|
|
- return totalUsedSpace;
|
|
|
- }
|
|
|
+ return attrVal;
|
|
|
|
|
|
+ }
|
|
|
/** simple utility to watch streams from an exec'ed process */
|
|
|
static class StreamWatcher implements Runnable {
|
|
|
|
|
@@ -483,25 +564,47 @@ public class TestBalancer {
|
|
|
}
|
|
|
}
|
|
|
|
|
|
- /**
|
|
|
+ // A constant for SUCCESS exit code
|
|
|
+ static final int SUCCESS = 1;
|
|
|
+
|
|
|
+ /**
|
|
|
* Balancer_01
|
|
|
* Start balancer and check if the cluster is balanced after the run.
|
|
|
* Cluster should end up in balanced state.
|
|
|
*/
|
|
|
@Test
|
|
|
public void testBalancerSimple() throws IOException {
|
|
|
- // run balancer on "normal"cluster cluster
|
|
|
- throw new UnsupportedOperationException("not implemented yet!");
|
|
|
+
|
|
|
+ DNClient[] datanodes = toDatanodeArray( getAllDatanodes() );
|
|
|
+ int exitStatus = runBalancer();
|
|
|
+ // assert on successful exit code here
|
|
|
+ Assert.assertTrue(
|
|
|
+ String.format("balancer returned non-success exit code: %d",
|
|
|
+ exitStatus),
|
|
|
+ (exitStatus == SUCCESS));
|
|
|
+ Assert.assertTrue( "cluster is not balanced", isClusterBalanced(datanodes) );
|
|
|
+
|
|
|
}
|
|
|
|
|
|
/**
|
|
|
* Balancer_02
|
|
|
* Test a cluster with even distribution, then a new empty node is
|
|
|
- * added to the cluster.
|
|
|
+ * added to the cluster. Here, even distribution effectively means the
|
|
|
+ * cluster is in "balanced" state, as bytes consumed for block allocation
|
|
|
+ * are evenly distributed throughout the cluster.
|
|
|
*/
|
|
|
@Test
|
|
|
public void testBalancerEvenDistributionWithNewNodeAdded() throws IOException {
|
|
|
throw new UnsupportedOperationException("not implemented yet!");
|
|
|
+
|
|
|
+ // get all nodes
|
|
|
+ // need to get an external reserve of nodes we can boot up
|
|
|
+ // to add to this cluster?
|
|
|
+ // HOW?
|
|
|
+
|
|
|
+ // IDEA try to steal some nodes from omega-M for now.....
|
|
|
+ // hmmm also need a way to give an alternate "empty-node" config
|
|
|
+ // to "hide" the data that may already exist on this node
|
|
|
}
|
|
|
|
|
|
/**
|
|
@@ -511,6 +614,8 @@ public class TestBalancer {
|
|
|
*/
|
|
|
@Test
|
|
|
public void testBalancerSingleNodeClusterWithNewNodeAdded() throws IOException {
|
|
|
+ // empty datanode: mod config to point to non-default blocks dir.
|
|
|
+ // limit capacity to available storage space
|
|
|
throw new UnsupportedOperationException("not implemented yet!");
|
|
|
}
|
|
|
|
|
@@ -522,6 +627,7 @@ public class TestBalancer {
|
|
|
@Test
|
|
|
public void testBalancerSingleNodeClusterWithNewNodeAddedFromDifferentRack()
|
|
|
throws IOException {
|
|
|
+ // need rack awareness
|
|
|
throw new UnsupportedOperationException("not implemented yet!");
|
|
|
}
|
|
|
|
|
@@ -532,6 +638,7 @@ public class TestBalancer {
|
|
|
*/
|
|
|
@Test
|
|
|
public void testBalancerSingleNodeClusterWithHalfCapacityNewNode() {
|
|
|
+ // how to limit node capacity?
|
|
|
throw new UnsupportedOperationException("not implemented yet!");
|
|
|
}
|
|
|
|
|
@@ -542,6 +649,7 @@ public class TestBalancer {
|
|
|
*/
|
|
|
@Test
|
|
|
public void testBalancerTwoNodeMultiRackCluster() {
|
|
|
+ // need rack awareness
|
|
|
throw new UnsupportedOperationException("not implemented yet!");
|
|
|
}
|
|
|
|
|
@@ -554,7 +662,24 @@ public class TestBalancer {
|
|
|
@Test
|
|
|
public void testBalancerTwoNodeSingleRackClusterWuthNewNodeAdded()
|
|
|
throws IOException {
|
|
|
- throw new UnsupportedOperationException("not implemented yet!");
|
|
|
+
|
|
|
+ final short TEST_REPLICATION_FACTOR = 3;
|
|
|
+ List<DNClient> testnodes = reserveDatanodesForTest(3);
|
|
|
+ DNClient dnA = testnodes.get(0);
|
|
|
+ DNClient dnB = testnodes.get(1);
|
|
|
+
|
|
|
+ DNClient dnC = testnodes.get(2);
|
|
|
+ stopDatanode(dnC);
|
|
|
+
|
|
|
+ // change test: 30% full-er (ie, 30% over pre-test capacity),
|
|
|
+ // use most heavily node as baseline
|
|
|
+ long targetLoad = (long) (
|
|
|
+ (1/DFS_BLOCK_SIZE) *
|
|
|
+ 0.30 *
|
|
|
+ Math.max( getDatanodeUsedSpace(dnA), getDatanodeUsedSpace(dnB) ) );
|
|
|
+ generateFileSystemLoad(targetLoad, TEST_REPLICATION_FACTOR);
|
|
|
+ startDatanode(dnC);
|
|
|
+ runBalancerAndVerify(testnodes);
|
|
|
}
|
|
|
|
|
|
/**
|
|
@@ -564,6 +689,7 @@ public class TestBalancer {
|
|
|
@Test
|
|
|
public void testBalancerTwoNodeMultiRackClusterWithNewNodeAdded()
|
|
|
throws IOException {
|
|
|
+ // need rack awareness
|
|
|
throw new UnsupportedOperationException("not implemented yet!");
|
|
|
}
|
|
|
|
|
@@ -574,6 +700,7 @@ public class TestBalancer {
|
|
|
@Test
|
|
|
public void testBalancerTwoNodeSingleRackClusterInterruptingRebalance()
|
|
|
throws IOException {
|
|
|
+ // interrupt thread
|
|
|
throw new UnsupportedOperationException("not implemented yet!");
|
|
|
}
|
|
|
|
|
@@ -584,6 +711,7 @@ public class TestBalancer {
|
|
|
@Test
|
|
|
public void testBalancerRestartInterruptedBalancerUntilDone()
|
|
|
throws IOException {
|
|
|
+ // need kill-restart thread
|
|
|
throw new UnsupportedOperationException("not implemented yet!");
|
|
|
}
|
|
|
|
|
@@ -594,6 +722,7 @@ public class TestBalancer {
|
|
|
@Test
|
|
|
public void testBalancerTwoNodeSingleRackShutdownNameNodeDuringRebalance()
|
|
|
throws IOException {
|
|
|
+ // need NN shutdown thread in addition
|
|
|
throw new UnsupportedOperationException("not implemented yet!");
|
|
|
}
|
|
|
|
|
@@ -605,6 +734,7 @@ public class TestBalancer {
|
|
|
public void
|
|
|
testBalancerSingleNodeClusterWithHalfCapacityNewNodeRebalanceWithConcurrentFSWrites()
|
|
|
throws IOException {
|
|
|
+ // writer thread
|
|
|
throw new UnsupportedOperationException("not implemented yet!");
|
|
|
}
|
|
|
|
|
@@ -615,6 +745,7 @@ public class TestBalancer {
|
|
|
@Test
|
|
|
public void testBalancerSingleNodeClusterWithHalfCapacityNewNodeRebalanceWithConcurrentFSDeletes()
|
|
|
throws IOException {
|
|
|
+ // eraser thread
|
|
|
throw new UnsupportedOperationException("not implemented yet!");
|
|
|
}
|
|
|
|
|
@@ -626,6 +757,7 @@ public class TestBalancer {
|
|
|
@Test
|
|
|
public void testBalancerSingleNodeClusterWithHalfCapacityNewNodeRebalanceWithConcurrentFSDeletesAndWrites()
|
|
|
throws IOException {
|
|
|
+ // writer & eraser threads
|
|
|
throw new UnsupportedOperationException("not implemented yet!");
|
|
|
}
|
|
|
|
|
@@ -639,6 +771,22 @@ public class TestBalancer {
|
|
|
*/
|
|
|
@Test
|
|
|
public void testBalancerScalability() throws IOException {
|
|
|
+ /* work in progress->
|
|
|
+ *
|
|
|
+ *
|
|
|
+ List<DNClient> dnList = getAllDatanodes();
|
|
|
+ int dnCount = dnList.size();
|
|
|
+
|
|
|
+ Assert.assertTrue(
|
|
|
+ String.format(
|
|
|
+ "not enough datanodes available to run test,"
|
|
|
+ + " need 2 datanodes but have only %d available",
|
|
|
+ dnCount),
|
|
|
+ ( dnCount == (875 - 2) ));
|
|
|
+
|
|
|
+ List<DNClient> datanodes = reserveDatanodesForTest(750);
|
|
|
+ shutdownNonTestNodes(datanodes);
|
|
|
+ */
|
|
|
throw new UnsupportedOperationException("not implemented yet!");
|
|
|
}
|
|
|
|
|
@@ -649,7 +797,12 @@ public class TestBalancer {
|
|
|
@Test
|
|
|
public void testBalancerConfiguredWithThresholdValueNegative()
|
|
|
throws IOException {
|
|
|
- throw new UnsupportedOperationException("not implemented yet!");
|
|
|
+ List<DNClient> testnodes = getAllDatanodes();
|
|
|
+ final int TRIALS=5;
|
|
|
+ for(int i=0; i<TRIALS; i++) {
|
|
|
+ int negThreshold = (int)(-1 * 100 * Math.random());
|
|
|
+ runBalancerAndVerify(testnodes, negThreshold);
|
|
|
+ }
|
|
|
}
|
|
|
|
|
|
/**
|
|
@@ -660,7 +813,13 @@ public class TestBalancer {
|
|
|
@Test
|
|
|
public void testBalancerConfiguredWithThresholdValueOutOfRange()
|
|
|
throws IOException {
|
|
|
- throw new UnsupportedOperationException("not implemented yet!");
|
|
|
+ List<DNClient> testnodes = getAllDatanodes();
|
|
|
+ final int[] THRESHOLD_OUT_OF_RANGE_DATA = {
|
|
|
+ -123, 0, -324, 100000, -12222222, 1000000000, -10000, 345, 989
|
|
|
+ };
|
|
|
+ for(int threshold: THRESHOLD_OUT_OF_RANGE_DATA) {
|
|
|
+ runBalancerAndVerify(testnodes, threshold);
|
|
|
+ }
|
|
|
}
|
|
|
|
|
|
/**
|
|
@@ -671,7 +830,14 @@ public class TestBalancer {
|
|
|
@Test
|
|
|
public void testBalancerConfiguredWithThresholdValueAlphanumeric()
|
|
|
throws IOException {
|
|
|
- throw new UnsupportedOperationException("not implemented yet!");
|
|
|
+ List<DNClient> testnodes = getAllDatanodes();
|
|
|
+ final String[] THRESHOLD_ALPHA_DATA = {
|
|
|
+ "103dsf", "asd234", "asfd", "ASD", "#$asd", "2345&", "$35", "%34",
|
|
|
+ "0x64", "0xde", "0xad", "0xbe", "0xef"
|
|
|
+ };
|
|
|
+ for(String threshold: THRESHOLD_ALPHA_DATA) {
|
|
|
+ runBalancerAndVerify(testnodes,threshold);
|
|
|
+ }
|
|
|
}
|
|
|
|
|
|
/**
|
|
@@ -681,6 +847,7 @@ public class TestBalancer {
|
|
|
@Test
|
|
|
public void testBalancerRunTwoConcurrentInstancesOnSingleGateway()
|
|
|
throws IOException {
|
|
|
+ // do on gateway logic with small balancer heap
|
|
|
throw new UnsupportedOperationException("not implemented yet!");
|
|
|
}
|
|
|
|
|
@@ -691,6 +858,7 @@ public class TestBalancer {
|
|
|
@Test
|
|
|
public void testBalancerRunTwoConcurrentInstancesOnDistinctGateways()
|
|
|
throws IOException {
|
|
|
+ // do on gateway logic with small balancer heap
|
|
|
throw new UnsupportedOperationException("not implemented yet!");
|
|
|
}
|
|
|
|
|
@@ -700,7 +868,9 @@ public class TestBalancer {
|
|
|
*/
|
|
|
@Test
|
|
|
public void testBalancerOnBalancedCluster() throws IOException {
|
|
|
- throw new UnsupportedOperationException("not implemented yet!");
|
|
|
+ // run balancer twice
|
|
|
+ testBalancerSimple();
|
|
|
+ testBalancerSimple();
|
|
|
}
|
|
|
|
|
|
/**
|
|
@@ -709,9 +879,12 @@ public class TestBalancer {
|
|
|
*/
|
|
|
@Test
|
|
|
public void testBalancerWithOnlyHalfOfDataNodesRunning()
|
|
|
- throws IOException {
|
|
|
- throw new UnsupportedOperationException("not implemented yet!");
|
|
|
- }
|
|
|
+ throws IOException {
|
|
|
+ List<DNClient> datanodes = getAllDatanodes();
|
|
|
+ int testnodeCount = (int)Math.floor(datanodes.size() * 0.5);
|
|
|
+ List<DNClient> testnodes = reserveDatanodesForTest(testnodeCount);
|
|
|
+ runBalancerAndVerify(testnodes);
|
|
|
+ }
|
|
|
|
|
|
/**
|
|
|
* Balancer_23
|
|
@@ -721,6 +894,7 @@ public class TestBalancer {
|
|
|
@Test
|
|
|
public void testBalancerOnBusyClusterWithOnlyHalfOfDatanodesRunning()
|
|
|
throws IOException {
|
|
|
+ // load thread
|
|
|
throw new UnsupportedOperationException("not implemented yet!");
|
|
|
}
|
|
|
|
|
@@ -823,7 +997,40 @@ public class TestBalancer {
|
|
|
@Test
|
|
|
public void testNamenodeProtocolGetBlocksFromNonexistentDatanode()
|
|
|
throws IOException {
|
|
|
+ final short replication = 1;
|
|
|
+ Path balancerTempDir = null;
|
|
|
+ try {
|
|
|
+ // reserve 2 nodes for test
|
|
|
+ List<DNClient> testnodes = reserveDatanodesForTest(2);
|
|
|
+ shutdownNonTestNodes(testnodes);
|
|
|
|
|
|
+ DNClient testnode1 = testnodes.get(0);
|
|
|
+ DNClient testnode2 = testnodes.get(1);
|
|
|
+
|
|
|
+ // write some blocks with replication factor of 1
|
|
|
+ balancerTempDir = makeTempDir();
|
|
|
+ generateFileSystemLoad(20, replication);
|
|
|
+
|
|
|
+ // get block locations from NN
|
|
|
+ NNClient namenode = dfsCluster.getNNClient();
|
|
|
+ // TODO extend namenode to get block locations
|
|
|
+ //namenode.get
|
|
|
+
|
|
|
+ // shutdown 1 node
|
|
|
+ stopDatanode(testnode1);
|
|
|
+
|
|
|
+ // attempt to retrieve blocks from the dead node
|
|
|
+ // we should fail
|
|
|
+ } finally {
|
|
|
+ // cleanup
|
|
|
+ // finally block to run cleanup
|
|
|
+ LOG.info("clean off test data from DFS [rmr ~/balancer-temp]");
|
|
|
+ try {
|
|
|
+ deleteTempDir(balancerTempDir);
|
|
|
+ } catch (Exception e) {
|
|
|
+ LOG.warn("problem cleaning up temp dir", e);
|
|
|
+ }
|
|
|
+ }
|
|
|
}
|
|
|
}
|
|
|
|