|
@@ -30,9 +30,7 @@ import java.net.InetAddress;
|
|
|
import java.net.URL;
|
|
|
import java.util.ArrayList;
|
|
|
import java.util.Arrays;
|
|
|
-import java.util.HashSet;
|
|
|
import java.util.List;
|
|
|
-import java.util.Set;
|
|
|
import java.util.concurrent.atomic.AtomicBoolean;
|
|
|
|
|
|
import org.apache.commons.logging.Log;
|
|
@@ -44,70 +42,37 @@ import org.apache.hadoop.net.NetUtils;
|
|
|
import org.apache.hadoop.util.JarFinder;
|
|
|
import org.apache.hadoop.util.Shell;
|
|
|
import org.apache.hadoop.yarn.api.records.ApplicationReport;
|
|
|
-import org.apache.hadoop.yarn.api.records.NodeId;
|
|
|
import org.apache.hadoop.yarn.api.records.YarnApplicationState;
|
|
|
import org.apache.hadoop.yarn.api.records.timeline.TimelineDomain;
|
|
|
import org.apache.hadoop.yarn.api.records.timeline.TimelineEntities;
|
|
|
import org.apache.hadoop.yarn.client.api.YarnClient;
|
|
|
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
|
|
import org.apache.hadoop.yarn.server.MiniYARNCluster;
|
|
|
-import org.apache.hadoop.yarn.server.nodemanager.NodeManager;
|
|
|
-import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
|
|
|
-import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
|
|
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler;
|
|
|
import org.junit.After;
|
|
|
import org.junit.Assert;
|
|
|
import org.junit.Before;
|
|
|
import org.junit.Test;
|
|
|
|
|
|
-import com.google.common.collect.ImmutableMap;
|
|
|
-
|
|
|
public class TestDistributedShell {
|
|
|
|
|
|
private static final Log LOG =
|
|
|
LogFactory.getLog(TestDistributedShell.class);
|
|
|
|
|
|
- protected MiniYARNCluster yarnCluster = null;
|
|
|
- private int numNodeManager = 1;
|
|
|
-
|
|
|
- private YarnConfiguration conf = null;
|
|
|
+ protected MiniYARNCluster yarnCluster = null;
|
|
|
+ protected YarnConfiguration conf = null;
|
|
|
+ private static final int NUM_NMS = 1;
|
|
|
|
|
|
protected final static String APPMASTER_JAR =
|
|
|
JarFinder.getJar(ApplicationMaster.class);
|
|
|
-
|
|
|
- private void initializeNodeLabels() throws IOException {
|
|
|
- RMContext rmContext = yarnCluster.getResourceManager(0).getRMContext();
|
|
|
-
|
|
|
- // Setup node labels
|
|
|
- RMNodeLabelsManager labelsMgr = rmContext.getNodeLabelManager();
|
|
|
- Set<String> labels = new HashSet<String>();
|
|
|
- labels.add("x");
|
|
|
- labelsMgr.addToCluserNodeLabels(labels);
|
|
|
-
|
|
|
- // Setup queue access to node labels
|
|
|
- conf.set("yarn.scheduler.capacity.root.accessible-node-labels", "x");
|
|
|
- conf.set("yarn.scheduler.capacity.root.accessible-node-labels.x.capacity",
|
|
|
- "100");
|
|
|
- conf.set("yarn.scheduler.capacity.root.default.accessible-node-labels", "x");
|
|
|
- conf.set(
|
|
|
- "yarn.scheduler.capacity.root.default.accessible-node-labels.x.capacity",
|
|
|
- "100");
|
|
|
-
|
|
|
- rmContext.getScheduler().reinitialize(conf, rmContext);
|
|
|
-
|
|
|
- // Fetch node-ids from yarn cluster
|
|
|
- NodeId[] nodeIds = new NodeId[numNodeManager];
|
|
|
- for (int i = 0; i < numNodeManager; i++) {
|
|
|
- NodeManager mgr = this.yarnCluster.getNodeManager(i);
|
|
|
- nodeIds[i] = mgr.getNMContext().getNodeId();
|
|
|
- }
|
|
|
-
|
|
|
- // Set label x to NM[1]
|
|
|
- labelsMgr.addLabelsToNode(ImmutableMap.of(nodeIds[1], labels));
|
|
|
- }
|
|
|
|
|
|
@Before
|
|
|
public void setup() throws Exception {
|
|
|
+ setupInternal(NUM_NMS);
|
|
|
+ }
|
|
|
+
|
|
|
+ protected void setupInternal(int numNodeManager) throws Exception {
|
|
|
+
|
|
|
LOG.info("Starting up YARN cluster");
|
|
|
|
|
|
conf = new YarnConfiguration();
|
|
@@ -115,7 +80,6 @@ public class TestDistributedShell {
|
|
|
conf.set("yarn.log.dir", "target");
|
|
|
conf.setBoolean(YarnConfiguration.TIMELINE_SERVICE_ENABLED, true);
|
|
|
conf.set(YarnConfiguration.RM_SCHEDULER, CapacityScheduler.class.getName());
|
|
|
- numNodeManager = 2;
|
|
|
|
|
|
if (yarnCluster == null) {
|
|
|
yarnCluster =
|
|
@@ -127,9 +91,6 @@ public class TestDistributedShell {
|
|
|
|
|
|
waitForNMsToRegister();
|
|
|
|
|
|
- // currently only capacity scheduler support node labels,
|
|
|
- initializeNodeLabels();
|
|
|
-
|
|
|
URL url = Thread.currentThread().getContextClassLoader().getResource("yarn-site.xml");
|
|
|
if (url == null) {
|
|
|
throw new RuntimeException("Could not find 'yarn-site.xml' dummy file in classpath");
|
|
@@ -807,7 +768,7 @@ public class TestDistributedShell {
|
|
|
int sec = 60;
|
|
|
while (sec >= 0) {
|
|
|
if (yarnCluster.getResourceManager().getRMContext().getRMNodes().size()
|
|
|
- >= numNodeManager) {
|
|
|
+ >= NUM_NMS) {
|
|
|
break;
|
|
|
}
|
|
|
Thread.sleep(1000);
|
|
@@ -940,88 +901,5 @@ public class TestDistributedShell {
|
|
|
}
|
|
|
return numOfWords;
|
|
|
}
|
|
|
-
|
|
|
- @Test(timeout=90000)
|
|
|
- public void testDSShellWithNodeLabelExpression() throws Exception {
|
|
|
- // Start NMContainerMonitor
|
|
|
- NMContainerMonitor mon = new NMContainerMonitor();
|
|
|
- Thread t = new Thread(mon);
|
|
|
- t.start();
|
|
|
-
|
|
|
- // Submit a job which will sleep for 60 sec
|
|
|
- String[] args = {
|
|
|
- "--jar",
|
|
|
- APPMASTER_JAR,
|
|
|
- "--num_containers",
|
|
|
- "4",
|
|
|
- "--shell_command",
|
|
|
- "sleep",
|
|
|
- "--shell_args",
|
|
|
- "15",
|
|
|
- "--master_memory",
|
|
|
- "512",
|
|
|
- "--master_vcores",
|
|
|
- "2",
|
|
|
- "--container_memory",
|
|
|
- "128",
|
|
|
- "--container_vcores",
|
|
|
- "1",
|
|
|
- "--node_label_expression",
|
|
|
- "x"
|
|
|
- };
|
|
|
-
|
|
|
- LOG.info("Initializing DS Client");
|
|
|
- final Client client =
|
|
|
- new Client(new Configuration(yarnCluster.getConfig()));
|
|
|
- boolean initSuccess = client.init(args);
|
|
|
- Assert.assertTrue(initSuccess);
|
|
|
- LOG.info("Running DS Client");
|
|
|
- boolean result = client.run();
|
|
|
- LOG.info("Client run completed. Result=" + result);
|
|
|
-
|
|
|
- t.interrupt();
|
|
|
-
|
|
|
- // Check maximum number of containers on each NMs
|
|
|
- int[] maxRunningContainersOnNMs = mon.getMaxRunningContainersReport();
|
|
|
- // Check no container allocated on NM[0]
|
|
|
- Assert.assertEquals(0, maxRunningContainersOnNMs[0]);
|
|
|
- // Check there're some containers allocated on NM[1]
|
|
|
- Assert.assertTrue(maxRunningContainersOnNMs[1] > 0);
|
|
|
- }
|
|
|
-
|
|
|
- /**
|
|
|
- * Monitor containers running on NMs
|
|
|
- */
|
|
|
- private class NMContainerMonitor implements Runnable {
|
|
|
- // The interval of milliseconds of sampling (500ms)
|
|
|
- final static int SAMPLING_INTERVAL_MS = 500;
|
|
|
-
|
|
|
- // The maximum number of containers running on each NMs
|
|
|
- int[] maxRunningContainersOnNMs = new int[numNodeManager];
|
|
|
-
|
|
|
- @Override
|
|
|
- public void run() {
|
|
|
- while (true) {
|
|
|
- for (int i = 0; i < numNodeManager; i++) {
|
|
|
- int nContainers =
|
|
|
- yarnCluster.getNodeManager(i).getNMContext().getContainers()
|
|
|
- .size();
|
|
|
- if (nContainers > maxRunningContainersOnNMs[i]) {
|
|
|
- maxRunningContainersOnNMs[i] = nContainers;
|
|
|
- }
|
|
|
- }
|
|
|
- try {
|
|
|
- Thread.sleep(SAMPLING_INTERVAL_MS);
|
|
|
- } catch (InterruptedException e) {
|
|
|
- e.printStackTrace();
|
|
|
- break;
|
|
|
- }
|
|
|
- }
|
|
|
- }
|
|
|
-
|
|
|
- public int[] getMaxRunningContainersReport() {
|
|
|
- return maxRunningContainersOnNMs;
|
|
|
- }
|
|
|
- }
|
|
|
}
|
|
|
|