|
@@ -28,6 +28,7 @@ import java.util.Date;
|
|
import java.util.HashMap;
|
|
import java.util.HashMap;
|
|
import java.util.HashSet;
|
|
import java.util.HashSet;
|
|
import java.util.Iterator;
|
|
import java.util.Iterator;
|
|
|
|
+import java.util.LinkedHashMap;
|
|
import java.util.LinkedList;
|
|
import java.util.LinkedList;
|
|
import java.util.Map;
|
|
import java.util.Map;
|
|
import java.util.Random;
|
|
import java.util.Random;
|
|
@@ -67,8 +68,8 @@ import org.apache.hadoop.mapreduce.security.TokenCache;
|
|
*/
|
|
*/
|
|
public class TrackerDistributedCacheManager {
|
|
public class TrackerDistributedCacheManager {
|
|
// cacheID to cacheStatus mapping
|
|
// cacheID to cacheStatus mapping
|
|
- private TreeMap<String, CacheStatus> cachedArchives =
|
|
|
|
- new TreeMap<String, CacheStatus>();
|
|
|
|
|
|
+ private LinkedHashMap<String, CacheStatus> cachedArchives =
|
|
|
|
+ new LinkedHashMap<String, CacheStatus>();
|
|
private Map<JobID, TaskDistributedCacheManager> jobArchives =
|
|
private Map<JobID, TaskDistributedCacheManager> jobArchives =
|
|
Collections.synchronizedMap(
|
|
Collections.synchronizedMap(
|
|
new HashMap<JobID, TaskDistributedCacheManager>());
|
|
new HashMap<JobID, TaskDistributedCacheManager>());
|
|
@@ -79,8 +80,11 @@ public class TrackerDistributedCacheManager {
|
|
// default total cache size (10GB)
|
|
// default total cache size (10GB)
|
|
private static final long DEFAULT_CACHE_SIZE = 10737418240L;
|
|
private static final long DEFAULT_CACHE_SIZE = 10737418240L;
|
|
private static final long DEFAULT_CACHE_SUBDIR_LIMIT = 10000;
|
|
private static final long DEFAULT_CACHE_SUBDIR_LIMIT = 10000;
|
|
|
|
+ private static final float DEFAULT_CACHE_KEEP_AROUND_PCT = 0.95f;
|
|
private long allowedCacheSize;
|
|
private long allowedCacheSize;
|
|
private long allowedCacheSubdirs;
|
|
private long allowedCacheSubdirs;
|
|
|
|
+ private long allowedCacheSizeCleanupGoal;
|
|
|
|
+ private long allowedCacheSubdirsCleanupGoal;
|
|
|
|
|
|
private static final Log LOG =
|
|
private static final Log LOG =
|
|
LogFactory.getLog(TrackerDistributedCacheManager.class);
|
|
LogFactory.getLog(TrackerDistributedCacheManager.class);
|
|
@@ -110,6 +114,13 @@ public class TrackerDistributedCacheManager {
|
|
this.allowedCacheSubdirs = conf.getLong
|
|
this.allowedCacheSubdirs = conf.getLong
|
|
("mapreduce.tasktracker.local.cache.numberdirectories",
|
|
("mapreduce.tasktracker.local.cache.numberdirectories",
|
|
DEFAULT_CACHE_SUBDIR_LIMIT);
|
|
DEFAULT_CACHE_SUBDIR_LIMIT);
|
|
|
|
+ double cleanupPct = conf.getFloat("mapreduce.tasktracker.cache.local.keep.pct",
|
|
|
|
+ DEFAULT_CACHE_KEEP_AROUND_PCT);
|
|
|
|
+ this.allowedCacheSizeCleanupGoal =
|
|
|
|
+ (long)(this.allowedCacheSize * cleanupPct);
|
|
|
|
+ this.allowedCacheSubdirsCleanupGoal =
|
|
|
|
+ (long)(this.allowedCacheSubdirs * cleanupPct);
|
|
|
|
+
|
|
this.taskController = controller;
|
|
this.taskController = controller;
|
|
this.cleanupThread = new CleanupThread(conf);
|
|
this.cleanupThread = new CleanupThread(conf);
|
|
}
|
|
}
|
|
@@ -162,15 +173,13 @@ public class TrackerDistributedCacheManager {
|
|
lcacheStatus =
|
|
lcacheStatus =
|
|
new CacheStatus(new Path(localPath.toString().replace(cachePath, "")),
|
|
new CacheStatus(new Path(localPath.toString().replace(cachePath, "")),
|
|
localPath, new Path(subDir), uniqueString,
|
|
localPath, new Path(subDir), uniqueString,
|
|
- isPublic ? null : user);
|
|
|
|
|
|
+ isPublic ? null : user, key);
|
|
cachedArchives.put(key, lcacheStatus);
|
|
cachedArchives.put(key, lcacheStatus);
|
|
}
|
|
}
|
|
|
|
|
|
//mark the cache for use.
|
|
//mark the cache for use.
|
|
file.setStatus(lcacheStatus);
|
|
file.setStatus(lcacheStatus);
|
|
- synchronized (lcacheStatus) {
|
|
|
|
- lcacheStatus.refcount++;
|
|
|
|
- }
|
|
|
|
|
|
+ lcacheStatus.incRefCount();
|
|
}
|
|
}
|
|
|
|
|
|
try {
|
|
try {
|
|
@@ -203,11 +212,8 @@ public class TrackerDistributedCacheManager {
|
|
}
|
|
}
|
|
}
|
|
}
|
|
} catch (IOException ie) {
|
|
} catch (IOException ie) {
|
|
- synchronized (lcacheStatus) {
|
|
|
|
- // release this cache
|
|
|
|
- lcacheStatus.refcount -= 1;
|
|
|
|
- throw ie;
|
|
|
|
- }
|
|
|
|
|
|
+ lcacheStatus.decRefCount();
|
|
|
|
+ throw ie;
|
|
}
|
|
}
|
|
return localizedPath;
|
|
return localizedPath;
|
|
}
|
|
}
|
|
@@ -223,9 +229,7 @@ public class TrackerDistributedCacheManager {
|
|
* @throws IOException
|
|
* @throws IOException
|
|
*/
|
|
*/
|
|
void releaseCache(CacheStatus status) throws IOException {
|
|
void releaseCache(CacheStatus status) throws IOException {
|
|
- synchronized (status) {
|
|
|
|
- status.refcount--;
|
|
|
|
- }
|
|
|
|
|
|
+ status.decRefCount();
|
|
}
|
|
}
|
|
|
|
|
|
void setSize(CacheStatus status, long size) throws IOException {
|
|
void setSize(CacheStatus status, long size) throws IOException {
|
|
@@ -241,9 +245,7 @@ public class TrackerDistributedCacheManager {
|
|
* This method is called from unit tests.
|
|
* This method is called from unit tests.
|
|
*/
|
|
*/
|
|
int getReferenceCount(CacheStatus status) throws IOException {
|
|
int getReferenceCount(CacheStatus status) throws IOException {
|
|
- synchronized (status) {
|
|
|
|
- return status.refcount;
|
|
|
|
- }
|
|
|
|
|
|
+ return status.getRefCount();
|
|
}
|
|
}
|
|
|
|
|
|
/**
|
|
/**
|
|
@@ -540,11 +542,11 @@ public class TrackerDistributedCacheManager {
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
|
|
- static class CacheStatus {
|
|
|
|
|
|
+ class CacheStatus {
|
|
//
|
|
//
|
|
// This field should be accessed under global cachedArchives lock.
|
|
// This field should be accessed under global cachedArchives lock.
|
|
//
|
|
//
|
|
- int refcount; // number of instances using this cache
|
|
|
|
|
|
+ private int refcount; // number of instances using this cache
|
|
|
|
|
|
//
|
|
//
|
|
// The following two fields should be accessed under
|
|
// The following two fields should be accessed under
|
|
@@ -568,9 +570,11 @@ public class TrackerDistributedCacheManager {
|
|
Path localizedBaseDir;
|
|
Path localizedBaseDir;
|
|
// The user that owns the cache entry or null if it is public
|
|
// The user that owns the cache entry or null if it is public
|
|
final String user;
|
|
final String user;
|
|
|
|
+ //The key of this in the cachedArchives.
|
|
|
|
+ private final String key;
|
|
|
|
|
|
public CacheStatus(Path baseDir, Path localLoadPath, Path subDir,
|
|
public CacheStatus(Path baseDir, Path localLoadPath, Path subDir,
|
|
- String uniqueString, String user) {
|
|
|
|
|
|
+ String uniqueString, String user, String key) {
|
|
super();
|
|
super();
|
|
this.localizedLoadPath = localLoadPath;
|
|
this.localizedLoadPath = localLoadPath;
|
|
this.refcount = 0;
|
|
this.refcount = 0;
|
|
@@ -579,8 +583,34 @@ public class TrackerDistributedCacheManager {
|
|
this.subDir = subDir;
|
|
this.subDir = subDir;
|
|
this.uniqueString = uniqueString;
|
|
this.uniqueString = uniqueString;
|
|
this.user = user;
|
|
this.user = user;
|
|
|
|
+ this.key = key;
|
|
}
|
|
}
|
|
|
|
|
|
|
|
+ public synchronized void incRefCount() {
|
|
|
|
+ refcount += 1;
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ public void decRefCount() {
|
|
|
|
+ synchronized (cachedArchives) {
|
|
|
|
+ synchronized (this) {
|
|
|
|
+ refcount -= 1;
|
|
|
|
+ if(refcount <= 0) {
|
|
|
|
+ String key = this.key;
|
|
|
|
+ cachedArchives.remove(key);
|
|
|
|
+ cachedArchives.put(key, this);
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ public int getRefCount() {
|
|
|
|
+ return refcount;
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ public synchronized boolean isUsed() {
|
|
|
|
+ return refcount > 0;
|
|
|
|
+ }
|
|
|
|
+
|
|
Path getBaseDir(){
|
|
Path getBaseDir(){
|
|
return this.localizedBaseDir;
|
|
return this.localizedBaseDir;
|
|
}
|
|
}
|
|
@@ -917,49 +947,52 @@ public class TrackerDistributedCacheManager {
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
|
|
|
|
+ // For holding the properties of each cache directory
|
|
|
|
+ private static class CacheDir {
|
|
|
|
+ long size;
|
|
|
|
+ long subdirs;
|
|
|
|
+ }
|
|
|
|
+
|
|
/**
|
|
/**
|
|
* This class holds properties of each base directories and is responsible
|
|
* This class holds properties of each base directories and is responsible
|
|
* for clean up unused cache files in base directories.
|
|
* for clean up unused cache files in base directories.
|
|
*/
|
|
*/
|
|
protected class BaseDirManager {
|
|
protected class BaseDirManager {
|
|
-
|
|
|
|
- // For holding the properties of each cache directory
|
|
|
|
- private class CacheDir {
|
|
|
|
- long size;
|
|
|
|
- long subdirs;
|
|
|
|
- }
|
|
|
|
-
|
|
|
|
- private TreeMap<Path, BaseDirManager.CacheDir> properties =
|
|
|
|
- new TreeMap<Path, BaseDirManager.CacheDir>();
|
|
|
|
-
|
|
|
|
- private long getDirSize(Path p) {
|
|
|
|
- return properties.get(p).size;
|
|
|
|
- }
|
|
|
|
- private long getDirSubdirs(Path p) {
|
|
|
|
- return properties.get(p).subdirs;
|
|
|
|
- }
|
|
|
|
|
|
+ private TreeMap<Path, CacheDir> properties =
|
|
|
|
+ new TreeMap<Path, CacheDir>();
|
|
|
|
|
|
void checkAndCleanup() throws IOException {
|
|
void checkAndCleanup() throws IOException {
|
|
Collection<CacheStatus> toBeDeletedCache = new LinkedList<CacheStatus>();
|
|
Collection<CacheStatus> toBeDeletedCache = new LinkedList<CacheStatus>();
|
|
- Set<Path> toBeCleanedBaseDir = new HashSet<Path>();
|
|
|
|
|
|
+ HashMap<Path, CacheDir> toBeCleanedBaseDir =
|
|
|
|
+ new HashMap<Path, CacheDir>();
|
|
synchronized (properties) {
|
|
synchronized (properties) {
|
|
- for (Path baseDir : properties.keySet()) {
|
|
|
|
- if (allowedCacheSize < getDirSize(baseDir) ||
|
|
|
|
- allowedCacheSubdirs < getDirSubdirs(baseDir)) {
|
|
|
|
- toBeCleanedBaseDir.add(baseDir);
|
|
|
|
|
|
+ for (Map.Entry<Path, CacheDir> baseDir : properties.entrySet()) {
|
|
|
|
+ CacheDir baseDirCounts = baseDir.getValue();
|
|
|
|
+ if (allowedCacheSize < baseDirCounts.size ||
|
|
|
|
+ allowedCacheSubdirs < baseDirCounts.subdirs) {
|
|
|
|
+ CacheDir tcc = new CacheDir();
|
|
|
|
+ tcc.size = baseDirCounts.size - allowedCacheSizeCleanupGoal;
|
|
|
|
+ tcc.subdirs = baseDirCounts.subdirs - allowedCacheSubdirsCleanupGoal;
|
|
|
|
+ toBeCleanedBaseDir.put(baseDir.getKey(), tcc);
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}
|
|
// try deleting cache Status with refcount of zero
|
|
// try deleting cache Status with refcount of zero
|
|
synchronized (cachedArchives) {
|
|
synchronized (cachedArchives) {
|
|
- for (Iterator<String> it = cachedArchives.keySet().iterator();
|
|
|
|
- it.hasNext();) {
|
|
|
|
- String cacheId = it.next();
|
|
|
|
|
|
+ for(
|
|
|
|
+ Iterator<Map.Entry<String, CacheStatus>> it
|
|
|
|
+ = cachedArchives.entrySet().iterator();
|
|
|
|
+ it.hasNext(); ) {
|
|
|
|
+ Map.Entry<String, CacheStatus> entry = it.next();
|
|
|
|
+ String cacheId = entry.getKey();
|
|
CacheStatus cacheStatus = cachedArchives.get(cacheId);
|
|
CacheStatus cacheStatus = cachedArchives.get(cacheId);
|
|
- if (toBeCleanedBaseDir.contains(cacheStatus.getBaseDir())) {
|
|
|
|
|
|
+ CacheDir leftToClean = toBeCleanedBaseDir.get(cacheStatus.getBaseDir());
|
|
|
|
+ if (leftToClean != null && (leftToClean.size > 0 || leftToClean.subdirs > 0)) {
|
|
synchronized (cacheStatus) {
|
|
synchronized (cacheStatus) {
|
|
// if reference count is zero mark the cache for deletion
|
|
// if reference count is zero mark the cache for deletion
|
|
- if (cacheStatus.refcount == 0) {
|
|
|
|
|
|
+ if (!cacheStatus.isUsed()) {
|
|
|
|
+ leftToClean.size -= cacheStatus.size;
|
|
|
|
+ leftToClean.subdirs--;
|
|
// delete this cache entry from the global list
|
|
// delete this cache entry from the global list
|
|
// and mark the localized file for deletion
|
|
// and mark the localized file for deletion
|
|
toBeDeletedCache.add(cacheStatus);
|
|
toBeDeletedCache.add(cacheStatus);
|
|
@@ -1007,7 +1040,7 @@ public class TrackerDistributedCacheManager {
|
|
}
|
|
}
|
|
// decrement the size of the cache from baseDirSize
|
|
// decrement the size of the cache from baseDirSize
|
|
synchronized (baseDirManager.properties) {
|
|
synchronized (baseDirManager.properties) {
|
|
- BaseDirManager.CacheDir cacheDir = properties.get(cacheStatus.getBaseDir());
|
|
|
|
|
|
+ CacheDir cacheDir = properties.get(cacheStatus.getBaseDir());
|
|
if (cacheDir != null) {
|
|
if (cacheDir != null) {
|
|
cacheDir.size -= cacheStatus.size;
|
|
cacheDir.size -= cacheStatus.size;
|
|
cacheDir.subdirs--;
|
|
cacheDir.subdirs--;
|
|
@@ -1028,12 +1061,12 @@ public class TrackerDistributedCacheManager {
|
|
long cacheSize = cacheStatus.size;
|
|
long cacheSize = cacheStatus.size;
|
|
// decrement the size of the cache from baseDirSize
|
|
// decrement the size of the cache from baseDirSize
|
|
synchronized (baseDirManager.properties) {
|
|
synchronized (baseDirManager.properties) {
|
|
- BaseDirManager.CacheDir cacheDir = properties.get(cacheStatus.getBaseDir());
|
|
|
|
|
|
+ CacheDir cacheDir = properties.get(cacheStatus.getBaseDir());
|
|
if (cacheDir != null) {
|
|
if (cacheDir != null) {
|
|
cacheDir.size += cacheSize;
|
|
cacheDir.size += cacheSize;
|
|
cacheDir.subdirs++;
|
|
cacheDir.subdirs++;
|
|
} else {
|
|
} else {
|
|
- cacheDir = new BaseDirManager.CacheDir();
|
|
|
|
|
|
+ cacheDir = new CacheDir();
|
|
cacheDir.size = cacheSize;
|
|
cacheDir.size = cacheSize;
|
|
cacheDir.subdirs = 1;
|
|
cacheDir.subdirs = 1;
|
|
properties.put(cacheStatus.getBaseDir(), cacheDir);
|
|
properties.put(cacheStatus.getBaseDir(), cacheDir);
|