瀏覽代碼

AMBARI-3797. OOM due to ActionScheduler implementation. (odiachenko)

Oleksandr Diachenko 11 年之前
父節點
當前提交
346a50c35c
共有 1 個文件被更改,包括 19 次插入2 次删除
  1. 19 2
      ambari-server/src/main/java/org/apache/ambari/server/actionmanager/ActionScheduler.java

+ 19 - 2
ambari-server/src/main/java/org/apache/ambari/server/actionmanager/ActionScheduler.java

@@ -23,6 +23,7 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.TreeMap;
+import java.util.concurrent.TimeUnit;
 
 import org.apache.ambari.server.AmbariException;
 import org.apache.ambari.server.Role;
@@ -46,6 +47,8 @@ import org.apache.ambari.server.utils.StageUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import com.google.common.cache.Cache;
+import com.google.common.cache.CacheBuilder;
 import com.google.common.reflect.TypeToken;
 import com.google.inject.persist.UnitOfWork;
 
@@ -77,6 +80,7 @@ class ActionScheduler implements Runnable {
    * we receive awake() request during running a scheduler iteration.
    */
   private boolean activeAwakeRequest = false;
+  private Cache<Long, Map<String, List<String>>> clusterHostInfoCache;
 
   public ActionScheduler(long sleepTimeMilliSec, long actionTimeoutMilliSec,
       ActionDBAccessor db, ActionQueue actionQueue, Clusters fsmObject,
@@ -90,6 +94,9 @@ class ActionScheduler implements Runnable {
     this.maxAttempts = (short) maxAttempts;
     this.serverActionManager = serverActionManager;
     this.unitOfWork = unitOfWork;
+    this.clusterHostInfoCache = CacheBuilder.newBuilder().
+        expireAfterAccess(5, TimeUnit.MINUTES).
+        build();
   }
 
   public void start() {
@@ -441,8 +448,18 @@ class ActionScheduler implements Runnable {
     cmd.setHostname(hostsMap.getHostMap(hostname));
     
 
-    Type type = new TypeToken<Map<String, List<String>>>() {}.getType();
-    Map<String, List<String>> clusterHostInfo = StageUtils.getGson().fromJson(s.getClusterHostInfo(), type);
+    //Try to get clusterHostInfo from cache
+    Map<String, List<String>> clusterHostInfo = clusterHostInfoCache.getIfPresent(s.getStageId());
+    
+    LOG.info("Cluster host info cache size: " + clusterHostInfoCache.size());
+    
+
+    if (clusterHostInfo == null) {
+      Type type = new TypeToken<Map<String, List<String>>>() {}.getType();
+      clusterHostInfo = StageUtils.getGson().fromJson(s.getClusterHostInfo(), type);
+      clusterHostInfoCache.put(s.getStageId(), clusterHostInfo);
+    }
+    
     cmd.setClusterHostInfo(clusterHostInfo);
 
     actionQueue.enqueue(hostname, cmd);