瀏覽代碼

YARN-8137. Parallelize node addition in SLS. Contributed by Abhishek Modi.

Inigo Goiri 7 年之前
父節點
當前提交
fd24fd0ff7
共有 1 個文件被更改,包括 39 次插入18 次删除
  1. 39 18
      hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/SLSRunner.java

+ 39 - 18
hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/SLSRunner.java

@@ -33,6 +33,9 @@ import java.util.Random;
 import java.util.Set;
 import java.util.Collections;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
 
 import com.fasterxml.jackson.core.JsonFactory;
 import com.fasterxml.jackson.databind.ObjectMapper;
@@ -85,6 +88,7 @@ import org.apache.hadoop.yarn.sls.utils.SLSUtils;
 import org.apache.hadoop.yarn.util.UTCClock;
 import org.apache.hadoop.yarn.util.resource.ResourceUtils;
 import org.apache.hadoop.yarn.util.resource.Resources;
+import org.eclipse.jetty.util.ConcurrentHashSet;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -96,9 +100,10 @@ public class SLSRunner extends Configured implements Tool {
   private static TaskRunner runner = new TaskRunner();
   private String[] inputTraces;
   private Map<String, Integer> queueAppNumMap;
+  private int poolSize;
 
   // NM simulator
-  private HashMap<NodeId, NMSimulator> nmMap;
+  private Map<NodeId, NMSimulator> nmMap;
   private Resource nodeManagerResource;
   private String nodeFile;
 
@@ -158,7 +163,7 @@ public class SLSRunner extends Configured implements Tool {
   }
 
   private void init(Configuration tempConf) throws ClassNotFoundException {
-    nmMap = new HashMap<>();
+    nmMap = new ConcurrentHashMap<>();
     queueAppNumMap = new HashMap<>();
     amMap = new ConcurrentHashMap<>();
     amClassMap = new HashMap<>();
@@ -167,7 +172,7 @@ public class SLSRunner extends Configured implements Tool {
     setConf(tempConf);
 
     // runner
-    int poolSize = tempConf.getInt(SLSConfiguration.RUNNER_POOL_SIZE,
+    poolSize = tempConf.getInt(SLSConfiguration.RUNNER_POOL_SIZE,
         SLSConfiguration.RUNNER_POOL_SIZE_DEFAULT);
     SLSRunner.runner.setQueueSize(poolSize);
     // <AMType, Class> map
@@ -283,7 +288,8 @@ public class SLSRunner extends Configured implements Tool {
     rm.start();
   }
 
-  private void startNM() throws YarnException, IOException {
+  private void startNM() throws YarnException, IOException,
+      InterruptedException {
     // nm configuration
     int heartbeatInterval = getConf().getInt(
         SLSConfiguration.NM_HEARTBEAT_INTERVAL_MS,
@@ -333,21 +339,36 @@ public class SLSRunner extends Configured implements Tool {
 
     // create NM simulators
     Random random = new Random();
-    Set<String> rackSet = new HashSet<String>();
+    Set<String> rackSet = new ConcurrentHashSet<>();
+    int threadPoolSize = Math.max(poolSize,
+        SLSConfiguration.RUNNER_POOL_SIZE_DEFAULT);
+    ExecutorService executorService = Executors.
+        newFixedThreadPool(threadPoolSize);
     for (Map.Entry<String, Resource> entry : nodeResourceMap.entrySet()) {
-      // we randomize the heartbeat start time from zero to 1 interval
-      NMSimulator nm = new NMSimulator();
-      Resource nmResource = nodeManagerResource;
-      String hostName = entry.getKey();
-      if (entry.getValue() != null) {
-        nmResource = entry.getValue();
-      }
-      nm.init(hostName, nmResource, random.nextInt(heartbeatInterval),
-          heartbeatInterval, rm, resourceUtilizationRatio);
-      nmMap.put(nm.getNode().getNodeID(), nm);
-      runner.schedule(nm);
-      rackSet.add(nm.getNode().getRackName());
+      executorService.submit(new Runnable() {
+        @Override public void run() {
+          try {
+            // we randomize the heartbeat start time from zero to 1 interval
+            NMSimulator nm = new NMSimulator();
+            Resource nmResource = nodeManagerResource;
+            String hostName = entry.getKey();
+            if (entry.getValue() != null) {
+              nmResource = entry.getValue();
+            }
+            nm.init(hostName, nmResource,
+                random.nextInt(heartbeatInterval),
+                heartbeatInterval, rm, resourceUtilizationRatio);
+            nmMap.put(nm.getNode().getNodeID(), nm);
+            runner.schedule(nm);
+            rackSet.add(nm.getNode().getRackName());
+          } catch (IOException | YarnException e) {
+            LOG.error("Got an error while adding node", e);
+          }
+        }
+      });
     }
+    executorService.shutdown();
+    executorService.awaitTermination(10, TimeUnit.MINUTES);
     numRacks = rackSet.size();
     numNMs = nmMap.size();
   }
@@ -839,7 +860,7 @@ public class SLSRunner extends Configured implements Tool {
             (long)(Math.ceil(maxRuntime / 1000.0)));
   }
 
-  public HashMap<NodeId, NMSimulator> getNmMap() {
+  public Map<NodeId, NMSimulator> getNmMap() {
     return nmMap;
   }