浏览代码

YARN-5314. Fixed a ConcurrentModificationException in ATS v1.5 EntityGroupFSTimelineStore. Contributed by Li Lu.

(cherry picked from commit 673e5e02feba9171498a518c06ae70639c5f8854)
Vinod Kumar Vavilapalli 8 年之前
父节点
当前提交
687185feb2

+ 0 - 2
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/timeline/webapp/TimelineWebServices.java

@@ -47,7 +47,6 @@ import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.util.StringUtils;
 import org.apache.hadoop.util.StringUtils;
-import org.apache.hadoop.util.VersionInfo;
 import org.apache.hadoop.yarn.api.records.timeline.TimelineDomain;
 import org.apache.hadoop.yarn.api.records.timeline.TimelineDomain;
 import org.apache.hadoop.yarn.api.records.timeline.TimelineDomains;
 import org.apache.hadoop.yarn.api.records.timeline.TimelineDomains;
 import org.apache.hadoop.yarn.api.records.timeline.TimelineEntities;
 import org.apache.hadoop.yarn.api.records.timeline.TimelineEntities;
@@ -61,7 +60,6 @@ import org.apache.hadoop.yarn.server.timeline.NameValuePair;
 import org.apache.hadoop.yarn.server.timeline.TimelineDataManager;
 import org.apache.hadoop.yarn.server.timeline.TimelineDataManager;
 import org.apache.hadoop.yarn.server.timeline.TimelineReader.Field;
 import org.apache.hadoop.yarn.server.timeline.TimelineReader.Field;
 import org.apache.hadoop.yarn.api.records.timeline.TimelineAbout;
 import org.apache.hadoop.yarn.api.records.timeline.TimelineAbout;
-import org.apache.hadoop.yarn.util.YarnVersionInfo;
 import org.apache.hadoop.yarn.util.timeline.TimelineUtils;
 import org.apache.hadoop.yarn.util.timeline.TimelineUtils;
 import org.apache.hadoop.yarn.webapp.BadRequestException;
 import org.apache.hadoop.yarn.webapp.BadRequestException;
 import org.apache.hadoop.yarn.webapp.ForbiddenException;
 import org.apache.hadoop.yarn.webapp.ForbiddenException;

+ 3 - 32
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timeline-pluginstorage/src/main/java/org/apache/hadoop/yarn/server/timeline/EntityCacheItem.java

@@ -19,19 +19,13 @@ package org.apache.hadoop.yarn.server.timeline;
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.annotations.VisibleForTesting;
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.util.Time;
 import org.apache.hadoop.util.Time;
 import org.apache.hadoop.yarn.api.records.timeline.TimelineEntityGroupId;
 import org.apache.hadoop.yarn.api.records.timeline.TimelineEntityGroupId;
 import org.apache.hadoop.yarn.server.timeline.security.TimelineACLsManager;
 import org.apache.hadoop.yarn.server.timeline.security.TimelineACLsManager;
-import org.codehaus.jackson.JsonFactory;
-import org.codehaus.jackson.map.ObjectMapper;
 import org.slf4j.Logger;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.slf4j.LoggerFactory;
 
 
 import java.io.IOException;
 import java.io.IOException;
-import java.util.ArrayList;
-import java.util.List;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicInteger;
 
 
 /**
 /**
@@ -47,15 +41,12 @@ public class EntityCacheItem {
   private EntityGroupFSTimelineStore.AppLogs appLogs;
   private EntityGroupFSTimelineStore.AppLogs appLogs;
   private long lastRefresh;
   private long lastRefresh;
   private Configuration config;
   private Configuration config;
-  private FileSystem fs;
   private int refCount = 0;
   private int refCount = 0;
   private static AtomicInteger activeStores = new AtomicInteger(0);
   private static AtomicInteger activeStores = new AtomicInteger(0);
 
 
-  public EntityCacheItem(TimelineEntityGroupId gId, Configuration config,
-      FileSystem fs) {
+  public EntityCacheItem(TimelineEntityGroupId gId, Configuration config) {
     this.groupId = gId;
     this.groupId = gId;
     this.config = config;
     this.config = config;
-    this.fs = fs;
   }
   }
 
 
   /**
   /**
@@ -97,15 +88,12 @@ public class EntityCacheItem {
    * other operations on the same cache item.
    * other operations on the same cache item.
    *
    *
    * @param aclManager ACL manager for the timeline storage
    * @param aclManager ACL manager for the timeline storage
-   * @param jsonFactory JSON factory for the storage
-   * @param objMapper Object mapper for the storage
    * @param metrics Metrics to trace the status of the entity group store
    * @param metrics Metrics to trace the status of the entity group store
    * @return a {@link org.apache.hadoop.yarn.server.timeline.TimelineStore}
    * @return a {@link org.apache.hadoop.yarn.server.timeline.TimelineStore}
    *         object filled with all entities in the group.
    *         object filled with all entities in the group.
    * @throws IOException
    * @throws IOException
    */
    */
   public synchronized TimelineStore refreshCache(TimelineACLsManager aclManager,
   public synchronized TimelineStore refreshCache(TimelineACLsManager aclManager,
-      JsonFactory jsonFactory, ObjectMapper objMapper,
       EntityGroupFSTimelineStoreMetrics metrics) throws IOException {
       EntityGroupFSTimelineStoreMetrics metrics) throws IOException {
     if (needRefresh()) {
     if (needRefresh()) {
       long startTime = Time.monotonicNow();
       long startTime = Time.monotonicNow();
@@ -128,30 +116,13 @@ public class EntityCacheItem {
           // Store is not null, the refresh is triggered by stale storage.
           // Store is not null, the refresh is triggered by stale storage.
           metrics.incrCacheStaleRefreshes();
           metrics.incrCacheStaleRefreshes();
         }
         }
-        List<LogInfo> removeList = new ArrayList<>();
         try (TimelineDataManager tdm =
         try (TimelineDataManager tdm =
                 new TimelineDataManager(store, aclManager)) {
                 new TimelineDataManager(store, aclManager)) {
           tdm.init(config);
           tdm.init(config);
           tdm.start();
           tdm.start();
-          for (LogInfo log : appLogs.getDetailLogs()) {
-            LOG.debug("Try refresh logs for {}", log.getFilename());
-            // Only refresh the log that matches the cache id
-            if (log.matchesGroupId(groupId)) {
-              Path appDirPath = appLogs.getAppDirPath();
-              if (fs.exists(log.getPath(appDirPath))) {
-                LOG.debug("Refresh logs for cache id {}", groupId);
-                log.parseForStore(tdm, appDirPath, appLogs.isDone(),
-                    jsonFactory, objMapper, fs);
-              } else {
-                // The log may have been removed, remove the log
-                removeList.add(log);
-                LOG.info("File {} no longer exists, removing it from log list",
-                    log.getPath(appDirPath));
-              }
-            }
-          }
+          // Load data from appLogs to tdm
+          appLogs.loadDetailLog(tdm, groupId);
         }
         }
-        appLogs.getDetailLogs().removeAll(removeList);
       }
       }
       updateRefreshTimeToNow();
       updateRefreshTimeToNow();
       metrics.addCacheRefreshTime(Time.monotonicNow() - startTime);
       metrics.addCacheRefreshTime(Time.monotonicNow() - startTime);

+ 28 - 6
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timeline-pluginstorage/src/main/java/org/apache/hadoop/yarn/server/timeline/EntityGroupFSTimelineStore.java

@@ -230,7 +230,6 @@ public class EntityGroupFSTimelineStore extends CompositeService
 
 
     List<TimelineEntityGroupPlugin> pluginList
     List<TimelineEntityGroupPlugin> pluginList
         = new LinkedList<TimelineEntityGroupPlugin>();
         = new LinkedList<TimelineEntityGroupPlugin>();
-    Exception caught = null;
     ClassLoader customClassLoader = null;
     ClassLoader customClassLoader = null;
     if (pluginClasspath != null && pluginClasspath.length() > 0) {
     if (pluginClasspath != null && pluginClasspath.length() > 0) {
       try {
       try {
@@ -775,8 +774,8 @@ public class EntityGroupFSTimelineStore extends CompositeService
       summaryLogs.add(log);
       summaryLogs.add(log);
     }
     }
 
 
-    private void addDetailLog(String attemptDirName, String filename,
-        String owner) {
+    private synchronized void addDetailLog(String attemptDirName,
+        String filename, String owner) {
       for (LogInfo log : detailLogs) {
       for (LogInfo log : detailLogs) {
         if (log.getFilename().equals(filename)
         if (log.getFilename().equals(filename)
             && log.getAttemptDirName().equals(attemptDirName)) {
             && log.getAttemptDirName().equals(attemptDirName)) {
@@ -786,6 +785,30 @@ public class EntityGroupFSTimelineStore extends CompositeService
       detailLogs.add(new EntityLogInfo(attemptDirName, filename, owner));
       detailLogs.add(new EntityLogInfo(attemptDirName, filename, owner));
     }
     }
 
 
+    synchronized void loadDetailLog(TimelineDataManager tdm,
+        TimelineEntityGroupId groupId) throws IOException {
+      List<LogInfo> removeList = new ArrayList<>();
+      for (LogInfo log : detailLogs) {
+        LOG.debug("Try refresh logs for {}", log.getFilename());
+        // Only refresh the log that matches the cache id
+        if (log.matchesGroupId(groupId)) {
+          Path dirPath = getAppDirPath();
+          if (fs.exists(log.getPath(dirPath))) {
+            LOG.debug("Refresh logs for cache id {}", groupId);
+            log.parseForStore(tdm, dirPath, isDone(),
+                jsonFactory, objMapper, fs);
+          } else {
+            // The log may have been removed, remove the log
+            removeList.add(log);
+            LOG.info(
+                "File {} no longer exists, removing it from log list",
+                log.getPath(dirPath));
+          }
+        }
+      }
+      detailLogs.removeAll(removeList);
+    }
+
     public synchronized void moveToDone() throws IOException {
     public synchronized void moveToDone() throws IOException {
       Path doneAppPath = getDoneAppPath(appId);
       Path doneAppPath = getDoneAppPath(appId);
       if (!doneAppPath.equals(appDirPath)) {
       if (!doneAppPath.equals(appDirPath)) {
@@ -974,7 +997,7 @@ public class EntityGroupFSTimelineStore extends CompositeService
       cacheItem = this.cachedLogs.get(groupId);
       cacheItem = this.cachedLogs.get(groupId);
       if (cacheItem == null) {
       if (cacheItem == null) {
         LOG.debug("Set up new cache item for id {}", groupId);
         LOG.debug("Set up new cache item for id {}", groupId);
-        cacheItem = new EntityCacheItem(groupId, getConfig(), fs);
+        cacheItem = new EntityCacheItem(groupId, getConfig());
         AppLogs appLogs = getAndSetAppLogs(groupId.getApplicationId());
         AppLogs appLogs = getAndSetAppLogs(groupId.getApplicationId());
         if (appLogs != null) {
         if (appLogs != null) {
           LOG.debug("Set applogs {} for group id {}", appLogs, groupId);
           LOG.debug("Set applogs {} for group id {}", appLogs, groupId);
@@ -994,8 +1017,7 @@ public class EntityGroupFSTimelineStore extends CompositeService
       // Add the reference by the store
       // Add the reference by the store
       cacheItem.incrRefs();
       cacheItem.incrRefs();
       cacheItems.add(cacheItem);
       cacheItems.add(cacheItem);
-      store = cacheItem.refreshCache(aclManager, jsonFactory, objMapper,
-          metrics);
+      store = cacheItem.refreshCache(aclManager, metrics);
     } else {
     } else {
       LOG.warn("AppLogs for group id {} is null", groupId);
       LOG.warn("AppLogs for group id {} is null", groupId);
     }
     }

+ 3 - 3
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timeline-pluginstorage/src/test/java/org/apache/hadoop/yarn/server/timeline/TestEntityGroupFSTimelineStore.java

@@ -354,7 +354,7 @@ public class TestEntityGroupFSTimelineStore extends TimelineStoreTestUtils {
         AppState.COMPLETED);
         AppState.COMPLETED);
     EntityCacheItem cacheItem = new EntityCacheItem(
     EntityCacheItem cacheItem = new EntityCacheItem(
         EntityGroupPlugInForTest.getStandardTimelineGroupId(mainTestAppId),
         EntityGroupPlugInForTest.getStandardTimelineGroupId(mainTestAppId),
-        config, fs);
+        config);
     cacheItem.setAppLogs(appLogs);
     cacheItem.setAppLogs(appLogs);
     store.setCachedLogs(
     store.setCachedLogs(
         EntityGroupPlugInForTest.getStandardTimelineGroupId(mainTestAppId),
         EntityGroupPlugInForTest.getStandardTimelineGroupId(mainTestAppId),
@@ -406,7 +406,7 @@ public class TestEntityGroupFSTimelineStore extends TimelineStoreTestUtils {
             AppState.COMPLETED);
             AppState.COMPLETED);
     final EntityCacheItem cacheItem = new EntityCacheItem(
     final EntityCacheItem cacheItem = new EntityCacheItem(
         EntityGroupPlugInForTest.getStandardTimelineGroupId(mainTestAppId),
         EntityGroupPlugInForTest.getStandardTimelineGroupId(mainTestAppId),
-        config, fs);
+        config);
 
 
     cacheItem.setAppLogs(appLogs);
     cacheItem.setAppLogs(appLogs);
     store.setCachedLogs(
     store.setCachedLogs(
@@ -442,7 +442,7 @@ public class TestEntityGroupFSTimelineStore extends TimelineStoreTestUtils {
               AppState.COMPLETED);
               AppState.COMPLETED);
       EntityCacheItem item = new EntityCacheItem(
       EntityCacheItem item = new EntityCacheItem(
           EntityGroupPlugInForTest.getStandardTimelineGroupId(appId),
           EntityGroupPlugInForTest.getStandardTimelineGroupId(appId),
-          config, fs);
+          config);
       item.setAppLogs(currAppLog);
       item.setAppLogs(currAppLog);
       store.setCachedLogs(
       store.setCachedLogs(
           EntityGroupPlugInForTest.getStandardTimelineGroupId(appId),
           EntityGroupPlugInForTest.getStandardTimelineGroupId(appId),