|
@@ -30,7 +30,9 @@ import java.net.InetAddress;
|
|
import java.net.URL;
|
|
import java.net.URL;
|
|
import java.util.ArrayList;
|
|
import java.util.ArrayList;
|
|
import java.util.Arrays;
|
|
import java.util.Arrays;
|
|
|
|
+import java.util.HashSet;
|
|
import java.util.List;
|
|
import java.util.List;
|
|
|
|
+import java.util.Set;
|
|
import java.util.concurrent.atomic.AtomicBoolean;
|
|
import java.util.concurrent.atomic.AtomicBoolean;
|
|
|
|
|
|
import org.apache.commons.logging.Log;
|
|
import org.apache.commons.logging.Log;
|
|
@@ -42,6 +44,7 @@ import org.apache.hadoop.net.NetUtils;
|
|
import org.apache.hadoop.util.JarFinder;
|
|
import org.apache.hadoop.util.JarFinder;
|
|
import org.apache.hadoop.util.Shell;
|
|
import org.apache.hadoop.util.Shell;
|
|
import org.apache.hadoop.yarn.api.records.ApplicationReport;
|
|
import org.apache.hadoop.yarn.api.records.ApplicationReport;
|
|
|
|
+import org.apache.hadoop.yarn.api.records.NodeId;
|
|
import org.apache.hadoop.yarn.api.records.YarnApplicationState;
|
|
import org.apache.hadoop.yarn.api.records.YarnApplicationState;
|
|
import org.apache.hadoop.yarn.api.records.timeline.TimelineDomain;
|
|
import org.apache.hadoop.yarn.api.records.timeline.TimelineDomain;
|
|
import org.apache.hadoop.yarn.api.records.timeline.TimelineEntities;
|
|
import org.apache.hadoop.yarn.api.records.timeline.TimelineEntities;
|
|
@@ -49,40 +52,81 @@ import org.apache.hadoop.yarn.client.api.YarnClient;
|
|
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
|
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
|
import org.apache.hadoop.yarn.server.MiniYARNCluster;
|
|
import org.apache.hadoop.yarn.server.MiniYARNCluster;
|
|
import org.apache.hadoop.yarn.server.nodemanager.NodeManager;
|
|
import org.apache.hadoop.yarn.server.nodemanager.NodeManager;
|
|
-import org.apache.hadoop.yarn.server.nodemanager.containermanager.ContainerManagerImpl;
|
|
|
|
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
|
|
|
|
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fifo.FifoScheduler;
|
|
|
|
|
|
+import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
|
|
|
|
+import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
|
|
|
|
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler;
|
|
import org.junit.After;
|
|
import org.junit.After;
|
|
import org.junit.Assert;
|
|
import org.junit.Assert;
|
|
import org.junit.Before;
|
|
import org.junit.Before;
|
|
import org.junit.Test;
|
|
import org.junit.Test;
|
|
|
|
|
|
|
|
+import com.google.common.collect.ImmutableMap;
|
|
|
|
+
|
|
public class TestDistributedShell {
|
|
public class TestDistributedShell {
|
|
|
|
|
|
private static final Log LOG =
|
|
private static final Log LOG =
|
|
LogFactory.getLog(TestDistributedShell.class);
|
|
LogFactory.getLog(TestDistributedShell.class);
|
|
|
|
|
|
protected MiniYARNCluster yarnCluster = null;
|
|
protected MiniYARNCluster yarnCluster = null;
|
|
- protected Configuration conf = new YarnConfiguration();
|
|
|
|
|
|
+ private int numNodeManager = 1;
|
|
|
|
+
|
|
|
|
+ private YarnConfiguration conf = null;
|
|
|
|
|
|
protected final static String APPMASTER_JAR =
|
|
protected final static String APPMASTER_JAR =
|
|
JarFinder.getJar(ApplicationMaster.class);
|
|
JarFinder.getJar(ApplicationMaster.class);
|
|
|
|
+
|
|
|
|
+ private void initializeNodeLabels() throws IOException {
|
|
|
|
+ RMContext rmContext = yarnCluster.getResourceManager(0).getRMContext();
|
|
|
|
+
|
|
|
|
+ // Setup node labels
|
|
|
|
+ RMNodeLabelsManager labelsMgr = rmContext.getNodeLabelManager();
|
|
|
|
+ Set<String> labels = new HashSet<String>();
|
|
|
|
+ labels.add("x");
|
|
|
|
+ labelsMgr.addToCluserNodeLabels(labels);
|
|
|
|
+
|
|
|
|
+ // Setup queue access to node labels
|
|
|
|
+ conf.set("yarn.scheduler.capacity.root.accessible-node-labels", "x");
|
|
|
|
+ conf.set("yarn.scheduler.capacity.root.default.accessible-node-labels", "x");
|
|
|
|
+ conf.set(
|
|
|
|
+ "yarn.scheduler.capacity.root.default.accessible-node-labels.x.capacity",
|
|
|
|
+ "100");
|
|
|
|
+
|
|
|
|
+ rmContext.getScheduler().reinitialize(conf, rmContext);
|
|
|
|
+
|
|
|
|
+ // Fetch node-ids from yarn cluster
|
|
|
|
+ NodeId[] nodeIds = new NodeId[numNodeManager];
|
|
|
|
+ for (int i = 0; i < numNodeManager; i++) {
|
|
|
|
+ NodeManager mgr = this.yarnCluster.getNodeManager(i);
|
|
|
|
+ nodeIds[i] = mgr.getNMContext().getNodeId();
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ // Set label x to NM[1]
|
|
|
|
+ labelsMgr.addLabelsToNode(ImmutableMap.of(nodeIds[1], labels));
|
|
|
|
+ }
|
|
|
|
|
|
@Before
|
|
@Before
|
|
public void setup() throws Exception {
|
|
public void setup() throws Exception {
|
|
LOG.info("Starting up YARN cluster");
|
|
LOG.info("Starting up YARN cluster");
|
|
|
|
+
|
|
|
|
+ conf = new YarnConfiguration();
|
|
conf.setInt(YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_MB, 128);
|
|
conf.setInt(YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_MB, 128);
|
|
- conf.setClass(YarnConfiguration.RM_SCHEDULER,
|
|
|
|
- FifoScheduler.class, ResourceScheduler.class);
|
|
|
|
conf.set("yarn.log.dir", "target");
|
|
conf.set("yarn.log.dir", "target");
|
|
conf.setBoolean(YarnConfiguration.TIMELINE_SERVICE_ENABLED, true);
|
|
conf.setBoolean(YarnConfiguration.TIMELINE_SERVICE_ENABLED, true);
|
|
|
|
+ conf.set(YarnConfiguration.RM_SCHEDULER, CapacityScheduler.class.getName());
|
|
|
|
+ numNodeManager = 2;
|
|
|
|
+
|
|
if (yarnCluster == null) {
|
|
if (yarnCluster == null) {
|
|
- yarnCluster = new MiniYARNCluster(
|
|
|
|
- TestDistributedShell.class.getSimpleName(), 1, 1, 1, 1, true);
|
|
|
|
|
|
+ yarnCluster =
|
|
|
|
+ new MiniYARNCluster(TestDistributedShell.class.getSimpleName(), 1,
|
|
|
|
+ numNodeManager, 1, 1, true);
|
|
yarnCluster.init(conf);
|
|
yarnCluster.init(conf);
|
|
|
|
+
|
|
yarnCluster.start();
|
|
yarnCluster.start();
|
|
- NodeManager nm = yarnCluster.getNodeManager(0);
|
|
|
|
- waitForNMToRegister(nm);
|
|
|
|
|
|
+
|
|
|
|
+ waitForNMsToRegister();
|
|
|
|
+
|
|
|
|
+ // currently only capacity scheduler support node labels,
|
|
|
|
+ initializeNodeLabels();
|
|
|
|
|
|
URL url = Thread.currentThread().getContextClassLoader().getResource("yarn-site.xml");
|
|
URL url = Thread.currentThread().getContextClassLoader().getResource("yarn-site.xml");
|
|
if (url == null) {
|
|
if (url == null) {
|
|
@@ -757,13 +801,15 @@ public class TestDistributedShell {
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
|
|
- protected static void waitForNMToRegister(NodeManager nm)
|
|
|
|
- throws Exception {
|
|
|
|
- int attempt = 60;
|
|
|
|
- ContainerManagerImpl cm =
|
|
|
|
- ((ContainerManagerImpl) nm.getNMContext().getContainerManager());
|
|
|
|
- while (cm.getBlockNewContainerRequestsStatus() && attempt-- > 0) {
|
|
|
|
- Thread.sleep(2000);
|
|
|
|
|
|
+ protected void waitForNMsToRegister() throws Exception {
|
|
|
|
+ int sec = 60;
|
|
|
|
+ while (sec >= 0) {
|
|
|
|
+ if (yarnCluster.getResourceManager().getRMContext().getRMNodes().size()
|
|
|
|
+ >= numNodeManager) {
|
|
|
|
+ break;
|
|
|
|
+ }
|
|
|
|
+ Thread.sleep(1000);
|
|
|
|
+ sec--;
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
|
|
@@ -892,5 +938,88 @@ public class TestDistributedShell {
|
|
}
|
|
}
|
|
return numOfWords;
|
|
return numOfWords;
|
|
}
|
|
}
|
|
|
|
+
|
|
|
|
+ @Test(timeout=90000)
|
|
|
|
+ public void testDSShellWithNodeLabelExpression() throws Exception {
|
|
|
|
+ // Start NMContainerMonitor
|
|
|
|
+ NMContainerMonitor mon = new NMContainerMonitor();
|
|
|
|
+ Thread t = new Thread(mon);
|
|
|
|
+ t.start();
|
|
|
|
+
|
|
|
|
+ // Submit a job which will sleep for 60 sec
|
|
|
|
+ String[] args = {
|
|
|
|
+ "--jar",
|
|
|
|
+ APPMASTER_JAR,
|
|
|
|
+ "--num_containers",
|
|
|
|
+ "4",
|
|
|
|
+ "--shell_command",
|
|
|
|
+ "sleep",
|
|
|
|
+ "--shell_args",
|
|
|
|
+ "15",
|
|
|
|
+ "--master_memory",
|
|
|
|
+ "512",
|
|
|
|
+ "--master_vcores",
|
|
|
|
+ "2",
|
|
|
|
+ "--container_memory",
|
|
|
|
+ "128",
|
|
|
|
+ "--container_vcores",
|
|
|
|
+ "1",
|
|
|
|
+ "--node_label_expression",
|
|
|
|
+ "x"
|
|
|
|
+ };
|
|
|
|
+
|
|
|
|
+ LOG.info("Initializing DS Client");
|
|
|
|
+ final Client client =
|
|
|
|
+ new Client(new Configuration(yarnCluster.getConfig()));
|
|
|
|
+ boolean initSuccess = client.init(args);
|
|
|
|
+ Assert.assertTrue(initSuccess);
|
|
|
|
+ LOG.info("Running DS Client");
|
|
|
|
+ boolean result = client.run();
|
|
|
|
+ LOG.info("Client run completed. Result=" + result);
|
|
|
|
+
|
|
|
|
+ t.interrupt();
|
|
|
|
+
|
|
|
|
+ // Check maximum number of containers on each NMs
|
|
|
|
+ int[] maxRunningContainersOnNMs = mon.getMaxRunningContainersReport();
|
|
|
|
+ // Check no container allocated on NM[0]
|
|
|
|
+ Assert.assertEquals(0, maxRunningContainersOnNMs[0]);
|
|
|
|
+ // Check there're some containers allocated on NM[1]
|
|
|
|
+ Assert.assertTrue(maxRunningContainersOnNMs[1] > 0);
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ /**
|
|
|
|
+ * Monitor containers running on NMs
|
|
|
|
+ */
|
|
|
|
+ private class NMContainerMonitor implements Runnable {
|
|
|
|
+ // The interval of milliseconds of sampling (500ms)
|
|
|
|
+ final static int SAMPLING_INTERVAL_MS = 500;
|
|
|
|
+
|
|
|
|
+ // The maximum number of containers running on each NMs
|
|
|
|
+ int[] maxRunningContainersOnNMs = new int[numNodeManager];
|
|
|
|
+
|
|
|
|
+ @Override
|
|
|
|
+ public void run() {
|
|
|
|
+ while (true) {
|
|
|
|
+ for (int i = 0; i < numNodeManager; i++) {
|
|
|
|
+ int nContainers =
|
|
|
|
+ yarnCluster.getNodeManager(i).getNMContext().getContainers()
|
|
|
|
+ .size();
|
|
|
|
+ if (nContainers > maxRunningContainersOnNMs[i]) {
|
|
|
|
+ maxRunningContainersOnNMs[i] = nContainers;
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+ try {
|
|
|
|
+ Thread.sleep(SAMPLING_INTERVAL_MS);
|
|
|
|
+ } catch (InterruptedException e) {
|
|
|
|
+ e.printStackTrace();
|
|
|
|
+ break;
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ public int[] getMaxRunningContainersReport() {
|
|
|
|
+ return maxRunningContainersOnNMs;
|
|
|
|
+ }
|
|
|
|
+ }
|
|
}
|
|
}
|
|
|
|
|