|
@@ -100,6 +100,9 @@ public class TrackerDistributedCacheManager {
|
|
|
private Configuration trackerConf;
|
|
|
|
|
|
private static final Random random = new Random();
|
|
|
+
|
|
|
+ protected BaseDirManager baseDirManager = new BaseDirManager();
|
|
|
+ protected CleanupThread cleanupThread;
|
|
|
|
|
|
public TrackerDistributedCacheManager(Configuration conf,
|
|
|
TaskController controller
|
|
@@ -940,6 +943,130 @@ public class TrackerDistributedCacheManager {
|
|
|
return path;
|
|
|
}
|
|
|
|
|
|
+ /**
|
|
|
+ * A thread to check and cleanup the unused files periodically
|
|
|
+ */
|
|
|
+ protected class CleanupThread extends Thread {
|
|
|
+ // How often do we check if we need to clean up cache files?
|
|
|
+ private long cleanUpCheckPeriod = 60000L; // 1 minute
|
|
|
+ public CleanupThread(Configuration conf) {
|
|
|
+ cleanUpCheckPeriod =
|
|
|
+ conf.getLong("mapreduce.tasktracker.distributedcache.checkperiod",
|
|
|
+ cleanUpCheckPeriod);
|
|
|
+ }
|
|
|
+
|
|
|
+ private volatile boolean running = true;
|
|
|
+
|
|
|
+ public void stopRunning() {
|
|
|
+ running = false;
|
|
|
+ }
|
|
|
+
|
|
|
+ @Override
|
|
|
+ public void run() {
|
|
|
+ while (running) {
|
|
|
+ try {
|
|
|
+ Thread.sleep(cleanUpCheckPeriod);
|
|
|
+ baseDirManager.checkAndCleanup();
|
|
|
+ } catch (IOException e) {
|
|
|
+ LOG.error("Exception in DistributedCache CleanupThread.", e);
|
|
|
+ } catch(InterruptedException e) {
|
|
|
+ LOG.info("Cleanup...",e);
|
|
|
+ //To force us to exit cleanly
|
|
|
+ running = false;
|
|
|
+ } catch (Throwable t) {
|
|
|
+ exitTaskTracker(t);
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * Exit the task tracker because of a fatal error.
|
|
|
+ */
|
|
|
+ protected void exitTaskTracker(Throwable t) {
|
|
|
+ LOG.fatal("Distributed Cache cleanup thread received runtime exception." +
|
|
|
+ " Exiting the TaskTracker", t);
|
|
|
+ Runtime.getRuntime().exit(-1);
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * This class holds properties of each base directories and is responsible
|
|
|
+ * for clean up unused cache files in base directories.
|
|
|
+ */
|
|
|
+ protected class BaseDirManager {
|
|
|
+
|
|
|
+ // For holding the properties of each cache directory
|
|
|
+ private class CacheDir {
|
|
|
+ long size;
|
|
|
+ long subdirs;
|
|
|
+ }
|
|
|
+
|
|
|
+ private TreeMap<Path, BaseDirManager.CacheDir> properties =
|
|
|
+ new TreeMap<Path, BaseDirManager.CacheDir>();
|
|
|
+
|
|
|
+ private long getDirSize(Path p) {
|
|
|
+ return properties.get(p).size;
|
|
|
+ }
|
|
|
+ private long getDirSubdirs(Path p) {
|
|
|
+ return properties.get(p).subdirs;
|
|
|
+ }
|
|
|
+
|
|
|
+ void checkAndCleanup() throws IOException {
|
|
|
+ Collection<CacheStatus> toBeDeletedCache = new LinkedList<CacheStatus>();
|
|
|
+ Set<Path> toBeCleanedBaseDir = new HashSet<Path>();
|
|
|
+ synchronized (properties) {
|
|
|
+ for (Path baseDir : properties.keySet()) {
|
|
|
+ if (allowedCacheSize < getDirSize(baseDir) ||
|
|
|
+ allowedCacheSubdirs < getDirSubdirs(baseDir)) {
|
|
|
+ toBeCleanedBaseDir.add(baseDir);
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+ // try deleting cache Status with refcount of zero
|
|
|
+ synchronized (cachedArchives) {
|
|
|
+ for (Iterator<String> it = cachedArchives.keySet().iterator();
|
|
|
+ it.hasNext();) {
|
|
|
+ String cacheId = it.next();
|
|
|
+ CacheStatus cacheStatus = cachedArchives.get(cacheId);
|
|
|
+ if (toBeCleanedBaseDir.contains(cacheStatus.getBaseDir())) {
|
|
|
+ synchronized (cacheStatus) {
|
|
|
+ // if reference count is zero mark the cache for deletion
|
|
|
+ if (cacheStatus.refcount == 0) {
|
|
|
+ // delete this cache entry from the global list
|
|
|
+ // and mark the localized file for deletion
|
|
|
+ toBeDeletedCache.add(cacheStatus);
|
|
|
+ it.remove();
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ // do the deletion, after releasing the global lock
|
|
|
+ for (CacheStatus cacheStatus : toBeDeletedCache) {
|
|
|
+ synchronized (cacheStatus) {
|
|
|
+ Path localizedDir = cacheStatus.getLocalizedUniqueDir();
|
|
|
+ if (cacheStatus.user == null) {
|
|
|
+ TrackerDistributedCacheManager.LOG.info("Deleted path " + localizedDir);
|
|
|
+ try {
|
|
|
+ localFs.delete(localizedDir, true);
|
|
|
+ } catch (IOException e) {
|
|
|
+ TrackerDistributedCacheManager.LOG.warn("Could not delete distributed cache empty directory "
|
|
|
+ + localizedDir, e);
|
|
|
+ }
|
|
|
+ } else {
|
|
|
+ TrackerDistributedCacheManager.LOG.info("Deleted path " + localizedDir + " as " + cacheStatus.user);
|
|
|
+ String base = cacheStatus.getBaseDir().toString();
|
|
|
+ String userDir = TaskTracker.getUserDir(cacheStatus.user);
|
|
|
+ int skip = base.length() + 1 + userDir.length() + 1;
|
|
|
+ String relative = localizedDir.toString().substring(skip);
|
|
|
+ taskController.deleteAsUser(cacheStatus.user, relative);
|
|
|
+ }
|
|
|
+ deleteCacheInfoUpdate(cacheStatus);
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
/**
|
|
|
* Decrement the size and sub directory count of the cache from baseDirSize
|
|
|
* and baseDirNumberSubDir. Have to lock lcacheStatus before calling this.
|