|
@@ -26,6 +26,7 @@ import java.net.URI;
|
|
|
import java.util.ArrayList;
|
|
|
import java.util.HashMap;
|
|
|
import java.util.HashSet;
|
|
|
+import java.util.LinkedHashMap;
|
|
|
import java.util.List;
|
|
|
import java.util.Map;
|
|
|
import java.util.Set;
|
|
@@ -97,6 +98,7 @@ public class FileSystemTimelineWriter extends TimelineWriter{
|
|
|
private long ttl;
|
|
|
private LogFDsCache logFDsCache = null;
|
|
|
private boolean isAppendSupported;
|
|
|
+ private final AttemptDirCache attemptDirCache;
|
|
|
|
|
|
public FileSystemTimelineWriter(Configuration conf,
|
|
|
UserGroupInformation authUgi, Client client, URI resURI)
|
|
@@ -158,6 +160,15 @@ public class FileSystemTimelineWriter extends TimelineWriter{
|
|
|
|
|
|
objMapper = createObjectMapper();
|
|
|
|
|
|
+ int attemptDirCacheSize = conf.getInt(
|
|
|
+ YarnConfiguration
|
|
|
+ .TIMELINE_SERVICE_CLIENT_INTERNAL_ATTEMPT_DIR_CACHE_SIZE,
|
|
|
+ YarnConfiguration
|
|
|
+ .DEFAULT_TIMELINE_SERVICE_CLIENT_INTERNAL_ATTEMPT_DIR_CACHE_SIZE);
|
|
|
+
|
|
|
+ attemptDirCache =
|
|
|
+ new AttemptDirCache(attemptDirCacheSize, fs, activePath);
|
|
|
+
|
|
|
if (LOG.isDebugEnabled()) {
|
|
|
StringBuilder debugMSG = new StringBuilder();
|
|
|
debugMSG.append(
|
|
@@ -199,7 +210,7 @@ public class FileSystemTimelineWriter extends TimelineWriter{
|
|
|
= new ArrayList<TimelineEntity>();
|
|
|
List<TimelineEntity> entitiesToEntityCache
|
|
|
= new ArrayList<TimelineEntity>();
|
|
|
- Path attemptDir = createAttemptDir(appAttemptId);
|
|
|
+ Path attemptDir = attemptDirCache.getAppAttemptDir(appAttemptId);
|
|
|
|
|
|
for (TimelineEntity entity : entities) {
|
|
|
if (summaryEntityTypes.contains(entity.getEntityType())) {
|
|
@@ -279,32 +290,11 @@ public class FileSystemTimelineWriter extends TimelineWriter{
|
|
|
return mapper;
|
|
|
}
|
|
|
|
|
|
- private Path createAttemptDir(ApplicationAttemptId appAttemptId)
|
|
|
- throws IOException {
|
|
|
- Path appDir = createApplicationDir(appAttemptId.getApplicationId());
|
|
|
-
|
|
|
- Path attemptDir = new Path(appDir, appAttemptId.toString());
|
|
|
- if (!fs.exists(attemptDir)) {
|
|
|
- FileSystem.mkdirs(fs, attemptDir, new FsPermission(
|
|
|
- APP_LOG_DIR_PERMISSIONS));
|
|
|
- }
|
|
|
- return attemptDir;
|
|
|
- }
|
|
|
-
|
|
|
- private Path createApplicationDir(ApplicationId appId) throws IOException {
|
|
|
- Path appDir =
|
|
|
- new Path(activePath, appId.toString());
|
|
|
- if (!fs.exists(appDir)) {
|
|
|
- FileSystem.mkdirs(fs, appDir, new FsPermission(APP_LOG_DIR_PERMISSIONS));
|
|
|
- }
|
|
|
- return appDir;
|
|
|
- }
|
|
|
-
|
|
|
private void writeDomain(ApplicationAttemptId appAttemptId,
|
|
|
TimelineDomain domain) throws IOException {
|
|
|
Path domainLogPath =
|
|
|
- new Path(createAttemptDir(appAttemptId), DOMAIN_LOG_PREFIX
|
|
|
- + appAttemptId.toString());
|
|
|
+ new Path(attemptDirCache.getAppAttemptDir(appAttemptId),
|
|
|
+ DOMAIN_LOG_PREFIX + appAttemptId.toString());
|
|
|
if (LOG.isDebugEnabled()) {
|
|
|
LOG.debug("Writing domains for " + appAttemptId.toString() + " to "
|
|
|
+ domainLogPath);
|
|
@@ -958,4 +948,70 @@ public class FileSystemTimelineWriter extends TimelineWriter{
|
|
|
}
|
|
|
}
|
|
|
}
|
|
|
+
|
|
|
+ private static class AttemptDirCache {
|
|
|
+ private final int attemptDirCacheSize;
|
|
|
+ private final Map<ApplicationAttemptId, Path> attemptDirCache;
|
|
|
+ private final FileSystem fs;
|
|
|
+ private final Path activePath;
|
|
|
+
|
|
|
+ public AttemptDirCache(int cacheSize, FileSystem fs, Path activePath) {
|
|
|
+ this.attemptDirCacheSize = cacheSize;
|
|
|
+ this.attemptDirCache =
|
|
|
+ new LinkedHashMap<ApplicationAttemptId, Path>(
|
|
|
+ attemptDirCacheSize, 0.75f, true) {
|
|
|
+ private static final long serialVersionUID = 1L;
|
|
|
+ @Override
|
|
|
+ protected boolean removeEldestEntry(
|
|
|
+ Map.Entry<ApplicationAttemptId, Path> eldest) {
|
|
|
+ return size() > attemptDirCacheSize;
|
|
|
+ }
|
|
|
+ };
|
|
|
+ this.fs = fs;
|
|
|
+ this.activePath = activePath;
|
|
|
+ }
|
|
|
+
|
|
|
+ public Path getAppAttemptDir(ApplicationAttemptId attemptId)
|
|
|
+ throws IOException {
|
|
|
+ Path attemptDir = this.attemptDirCache.get(attemptId);
|
|
|
+ if (attemptDir == null) {
|
|
|
+ synchronized(this) {
|
|
|
+ attemptDir = this.attemptDirCache.get(attemptId);
|
|
|
+ if (attemptDir == null) {
|
|
|
+ attemptDir = createAttemptDir(attemptId);
|
|
|
+ attemptDirCache.put(attemptId, attemptDir);
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+ return attemptDir;
|
|
|
+ }
|
|
|
+
|
|
|
+ private Path createAttemptDir(ApplicationAttemptId appAttemptId)
|
|
|
+ throws IOException {
|
|
|
+ Path appDir = createApplicationDir(appAttemptId.getApplicationId());
|
|
|
+
|
|
|
+ Path attemptDir = new Path(appDir, appAttemptId.toString());
|
|
|
+ if (!fs.exists(attemptDir)) {
|
|
|
+ FileSystem.mkdirs(fs, attemptDir, new FsPermission(
|
|
|
+ APP_LOG_DIR_PERMISSIONS));
|
|
|
+ if (LOG.isDebugEnabled()) {
|
|
|
+ LOG.debug("New attempt directory created - " + attemptDir);
|
|
|
+ }
|
|
|
+ }
|
|
|
+ return attemptDir;
|
|
|
+ }
|
|
|
+
|
|
|
+ private Path createApplicationDir(ApplicationId appId) throws IOException {
|
|
|
+ Path appDir =
|
|
|
+ new Path(activePath, appId.toString());
|
|
|
+ if (!fs.exists(appDir)) {
|
|
|
+ FileSystem.mkdirs(fs, appDir,
|
|
|
+ new FsPermission(APP_LOG_DIR_PERMISSIONS));
|
|
|
+ if (LOG.isDebugEnabled()) {
|
|
|
+ LOG.debug("New app directory created - " + appDir);
|
|
|
+ }
|
|
|
+ }
|
|
|
+ return appDir;
|
|
|
+ }
|
|
|
+ }
|
|
|
}
|