Ver código fonte

YARN-8134. Support specifying node resources in SLS. Contributed by Abhishek Modi.

Inigo Goiri 7 anos atrás
pai
commit
78860372bd

+ 1 - 0
hadoop-tools/hadoop-sls/pom.xml

@@ -137,6 +137,7 @@
             <exclude>src/test/resources/syn_stream.json</exclude>
             <exclude>src/test/resources/inputsls.json</exclude>
             <exclude>src/test/resources/nodes.json</exclude>
+            <exclude>src/test/resources/nodes-with-resources.json</exclude>
             <exclude>src/test/resources/exit-invariants.txt</exclude>
             <exclude>src/test/resources/ongoing-invariants.txt</exclude>
           </excludes>

+ 25 - 10
hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/SLSRunner.java

@@ -292,21 +292,30 @@ public class SLSRunner extends Configured implements Tool {
         SLSConfiguration.NM_RESOURCE_UTILIZATION_RATIO,
         SLSConfiguration.NM_RESOURCE_UTILIZATION_RATIO_DEFAULT);
     // nm information (fetch from topology file, or from sls/rumen json file)
-    Set<String> nodeSet = new HashSet<String>();
+    Map<String, Resource> nodeResourceMap = new HashMap<>();
+    Set<? extends  String> nodeSet;
     if (nodeFile.isEmpty()) {
       for (String inputTrace : inputTraces) {
-
         switch (inputType) {
         case SLS:
-          nodeSet.addAll(SLSUtils.parseNodesFromSLSTrace(inputTrace));
+          nodeSet = SLSUtils.parseNodesFromSLSTrace(inputTrace);
+          for (String node : nodeSet) {
+            nodeResourceMap.put(node, null);
+          }
           break;
         case RUMEN:
-          nodeSet.addAll(SLSUtils.parseNodesFromRumenTrace(inputTrace));
+          nodeSet = SLSUtils.parseNodesFromRumenTrace(inputTrace);
+          for (String node : nodeSet) {
+            nodeResourceMap.put(node, null);
+          }
           break;
         case SYNTH:
           stjp = new SynthTraceJobProducer(getConf(), new Path(inputTraces[0]));
-          nodeSet.addAll(SLSUtils.generateNodes(stjp.getNumNodes(),
-              stjp.getNumNodes()/stjp.getNodesPerRack()));
+          nodeSet = SLSUtils.generateNodes(stjp.getNumNodes(),
+              stjp.getNumNodes()/stjp.getNodesPerRack());
+          for (String node : nodeSet) {
+            nodeResourceMap.put(node, null);
+          }
           break;
         default:
           throw new YarnException("Input configuration not recognized, "
@@ -314,20 +323,26 @@ public class SLSRunner extends Configured implements Tool {
         }
       }
     } else {
-      nodeSet.addAll(SLSUtils.parseNodesFromNodeFile(nodeFile));
+      nodeResourceMap = SLSUtils.parseNodesFromNodeFile(nodeFile,
+          nodeManagerResource);
     }
 
-    if (nodeSet.size() == 0) {
+    if (nodeResourceMap.size() == 0) {
       throw new YarnException("No node! Please configure nodes.");
     }
 
     // create NM simulators
     Random random = new Random();
     Set<String> rackSet = new HashSet<String>();
-    for (String hostName : nodeSet) {
+    for (Map.Entry<String, Resource> entry : nodeResourceMap.entrySet()) {
       // we randomize the heartbeat start time from zero to 1 interval
       NMSimulator nm = new NMSimulator();
-      nm.init(hostName, nodeManagerResource, random.nextInt(heartbeatInterval),
+      Resource nmResource = nodeManagerResource;
+      String hostName = entry.getKey();
+      if (entry.getValue() != null) {
+        nmResource = entry.getValue();
+      }
+      nm.init(hostName, nmResource, random.nextInt(heartbeatInterval),
           heartbeatInterval, rm, resourceUtilizationRatio);
       nmMap.put(nm.getNode().getNodeID(), nm);
       runner.schedule(nm);

+ 19 - 5
hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/utils/SLSUtils.java

@@ -22,6 +22,8 @@ import java.io.FileInputStream;
 import java.io.IOException;
 import java.io.InputStreamReader;
 import java.io.Reader;
+
+import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Iterator;
 import java.util.List;
@@ -39,7 +41,11 @@ import org.apache.hadoop.tools.rumen.JobTraceReader;
 import org.apache.hadoop.tools.rumen.LoggedJob;
 import org.apache.hadoop.tools.rumen.LoggedTask;
 import org.apache.hadoop.tools.rumen.LoggedTaskAttempt;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.api.records.ResourceInformation;
 import org.apache.hadoop.yarn.sls.conf.SLSConfiguration;
+import org.apache.hadoop.yarn.util.resource.ResourceUtils;
+import org.apache.hadoop.yarn.util.resource.Resources;
 
 @Private
 @Unstable
@@ -145,9 +151,9 @@ public class SLSUtils {
   /**
    * parse the input node file, return each host name
    */
-  public static Set<String> parseNodesFromNodeFile(String nodeFile)
-          throws IOException {
-    Set<String> nodeSet = new HashSet<String>();
+  public static Map<String, Resource> parseNodesFromNodeFile(String nodeFile,
+      Resource nmDefaultResource) throws IOException {
+    Map<String, Resource> nodeResourceMap = new HashMap<>();
     JsonFactory jsonF = new JsonFactory();
     ObjectMapper mapper = new ObjectMapper();
     Reader input =
@@ -160,13 +166,21 @@ public class SLSUtils {
         List tasks = (List) jsonE.get("nodes");
         for (Object o : tasks) {
           Map jsonNode = (Map) o;
-          nodeSet.add(rack + "/" + jsonNode.get("node"));
+          Resource nodeResource = Resources.clone(nmDefaultResource);
+          ResourceInformation[] infors = ResourceUtils.getResourceTypesArray();
+          for (ResourceInformation info : infors) {
+            if (jsonNode.get(info.getName()) != null) {
+              nodeResource.setResourceValue(info.getName(),
+                  Integer.parseInt(jsonNode.get(info.getName()).toString()));
+            }
+          }
+          nodeResourceMap.put(rack + "/" + jsonNode.get("node"), nodeResource);
         }
       }
     } finally {
       input.close();
     }
-    return nodeSet;
+    return nodeResourceMap;
   }
 
   public static Set<? extends String> generateNodes(int numNodes,

+ 25 - 0
hadoop-tools/hadoop-sls/src/test/java/org/apache/hadoop/yarn/sls/utils/TestSLSUtils.java

@@ -18,10 +18,13 @@
 
 package org.apache.hadoop.yarn.sls.utils;
 
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.util.resource.Resources;
 import org.junit.Assert;
 import org.junit.Test;
 
 import java.util.HashSet;
+import java.util.Map;
 import java.util.Set;
 
 public class TestSLSUtils {
@@ -39,6 +42,28 @@ public class TestSLSUtils {
     Assert.assertEquals(rackHostname[1], "node1");
   }
 
+  @Test
+  public void testParseNodesFromNodeFile() throws Exception {
+    String nodeFile = "src/test/resources/nodes.json";
+    Map<String, Resource> nodeResourceMap = SLSUtils.parseNodesFromNodeFile(
+        nodeFile, Resources.createResource(1024, 2));
+    Assert.assertEquals(20, nodeResourceMap.size());
+
+    nodeFile = "src/test/resources/nodes-with-resources.json";
+    nodeResourceMap = SLSUtils.parseNodesFromNodeFile(
+        nodeFile, Resources.createResource(1024, 2));
+    Assert.assertEquals(4,
+        nodeResourceMap.size());
+    Assert.assertEquals(2048,
+        nodeResourceMap.get("/rack1/node1").getMemorySize());
+    Assert.assertEquals(6,
+        nodeResourceMap.get("/rack1/node1").getVirtualCores());
+    Assert.assertEquals(1024,
+        nodeResourceMap.get("/rack1/node2").getMemorySize());
+    Assert.assertEquals(2,
+        nodeResourceMap.get("/rack1/node2").getVirtualCores());
+  }
+
   @Test
   public void testGenerateNodes() {
     Set<? extends String> nodes = SLSUtils.generateNodes(3, 3);

+ 19 - 0
hadoop-tools/hadoop-sls/src/test/resources/nodes-with-resources.json

@@ -0,0 +1,19 @@
+{
+  "rack": "rack1",
+  "nodes": [
+    {
+      "node": "node1",
+      "memory-mb" : 2048,
+      "vcores" : 6
+    },
+    {
+      "node": "node2"
+    },
+    {
+      "node": "node3"
+    },
+    {
+      "node": "node4"
+    }
+  ]
+}