|
@@ -22,12 +22,12 @@ import java.io.IOException;
|
|
|
import java.net.URI;
|
|
|
import java.net.URISyntaxException;
|
|
|
import java.text.DateFormat;
|
|
|
+import java.util.Collection;
|
|
|
import java.util.Collections;
|
|
|
import java.util.Date;
|
|
|
import java.util.HashMap;
|
|
|
import java.util.HashSet;
|
|
|
import java.util.Iterator;
|
|
|
-import java.util.List;
|
|
|
import java.util.LinkedList;
|
|
|
import java.util.Map;
|
|
|
import java.util.Random;
|
|
@@ -76,14 +76,6 @@ public class TrackerDistributedCacheManager {
|
|
|
private static final FsPermission PUBLIC_CACHE_OBJECT_PERM =
|
|
|
FsPermission.createImmutable((short) 0755);
|
|
|
|
|
|
- // For holding the properties of each cache directory
|
|
|
- static class CacheDir {
|
|
|
- long size;
|
|
|
- long subdirs;
|
|
|
- }
|
|
|
- private TreeMap<Path, CacheDir> baseDirProperties =
|
|
|
- new TreeMap<Path, CacheDir>();
|
|
|
-
|
|
|
// default total cache size (10GB)
|
|
|
private static final long DEFAULT_CACHE_SIZE = 10737418240L;
|
|
|
private static final long DEFAULT_CACHE_SUBDIR_LIMIT = 10000;
|
|
@@ -101,8 +93,8 @@ public class TrackerDistributedCacheManager {
|
|
|
|
|
|
private static final Random random = new Random();
|
|
|
|
|
|
- protected BaseDirManager baseDirManager = new BaseDirManager();
|
|
|
- protected CleanupThread cleanupThread;
|
|
|
+ BaseDirManager baseDirManager = new BaseDirManager();
|
|
|
+ CleanupThread cleanupThread;
|
|
|
|
|
|
public TrackerDistributedCacheManager(Configuration conf,
|
|
|
TaskController controller
|
|
@@ -119,6 +111,7 @@ public class TrackerDistributedCacheManager {
|
|
|
("mapreduce.tasktracker.local.cache.numberdirectories",
|
|
|
DEFAULT_CACHE_SUBDIR_LIMIT);
|
|
|
this.taskController = controller;
|
|
|
+ this.cleanupThread = new CleanupThread(conf);
|
|
|
}
|
|
|
|
|
|
/**
|
|
@@ -200,7 +193,7 @@ public class TrackerDistributedCacheManager {
|
|
|
|
|
|
// Increase the size and sub directory count of the cache
|
|
|
// from baseDirSize and baseDirNumberSubDir.
|
|
|
- addCacheInfoUpdate(lcacheStatus);
|
|
|
+ baseDirManager.addCacheInfoUpdate(lcacheStatus);
|
|
|
}
|
|
|
}
|
|
|
lcacheStatus.initComplete();
|
|
@@ -209,27 +202,6 @@ public class TrackerDistributedCacheManager {
|
|
|
lcacheStatus, fileStatus, isArchive);
|
|
|
}
|
|
|
}
|
|
|
-
|
|
|
- // try deleting stuff if you can
|
|
|
- long size = 0;
|
|
|
- long numberSubdirs = 0;
|
|
|
- synchronized (lcacheStatus) {
|
|
|
- synchronized (baseDirProperties) {
|
|
|
- CacheDir cacheDir = baseDirProperties.get(lcacheStatus.getBaseDir());
|
|
|
- if (cacheDir != null) {
|
|
|
- size = cacheDir.size;
|
|
|
- numberSubdirs = cacheDir.subdirs;
|
|
|
- } else {
|
|
|
- LOG.warn("Cannot find size and number of subdirectories of" +
|
|
|
- " baseDir: " + lcacheStatus.getBaseDir());
|
|
|
- }
|
|
|
- }
|
|
|
- }
|
|
|
-
|
|
|
- if (allowedCacheSize < size || allowedCacheSubdirs < numberSubdirs) {
|
|
|
- // try some cache deletions
|
|
|
- compactCache(conf);
|
|
|
- }
|
|
|
} catch (IOException ie) {
|
|
|
synchronized (lcacheStatus) {
|
|
|
// release this cache
|
|
@@ -260,7 +232,7 @@ public class TrackerDistributedCacheManager {
|
|
|
if (size != 0) {
|
|
|
synchronized (status) {
|
|
|
status.size = size;
|
|
|
- addCacheInfoUpdate(status);
|
|
|
+ baseDirManager.addCacheInfoUpdate(status);
|
|
|
}
|
|
|
}
|
|
|
}
|
|
@@ -292,54 +264,6 @@ public class TrackerDistributedCacheManager {
|
|
|
return user;
|
|
|
}
|
|
|
|
|
|
-
|
|
|
- // To delete the caches which have a refcount of zero
|
|
|
-
|
|
|
- private void compactCache(Configuration conf) throws IOException {
|
|
|
- List<CacheStatus> deleteList = new LinkedList<CacheStatus>();
|
|
|
- // try deleting cache Status with refcount of zero
|
|
|
- synchronized (cachedArchives) {
|
|
|
- for (Iterator<String> it = cachedArchives.keySet().iterator();
|
|
|
- it.hasNext();) {
|
|
|
- String cacheId = it.next();
|
|
|
- CacheStatus lcacheStatus = cachedArchives.get(cacheId);
|
|
|
- // if reference count is zero
|
|
|
- // mark the cache for deletion
|
|
|
- if (lcacheStatus.refcount == 0) {
|
|
|
- // delete this cache entry from the global list
|
|
|
- // and mark the localized file for deletion
|
|
|
- deleteList.add(lcacheStatus);
|
|
|
- it.remove();
|
|
|
- }
|
|
|
- }
|
|
|
- }
|
|
|
-
|
|
|
- // do the deletion, after releasing the global lock
|
|
|
- for (CacheStatus lcacheStatus : deleteList) {
|
|
|
- synchronized (lcacheStatus) {
|
|
|
- Path potentialDeletee = lcacheStatus.localizedLoadPath;
|
|
|
- Path localizedDir = lcacheStatus.getLocalizedUniqueDir();
|
|
|
- if (lcacheStatus.user == null) {
|
|
|
- LOG.info("Deleted path " + localizedDir);
|
|
|
- try {
|
|
|
- localFs.delete(localizedDir, true);
|
|
|
- } catch (IOException e) {
|
|
|
- LOG.warn("Could not delete distributed cache empty directory "
|
|
|
- + localizedDir, e);
|
|
|
- }
|
|
|
- } else {
|
|
|
- LOG.info("Deleted path " + localizedDir + " as " + lcacheStatus.user);
|
|
|
- String base = lcacheStatus.getBaseDir().toString();
|
|
|
- String userDir = TaskTracker.getUserDir(lcacheStatus.user);
|
|
|
- int skip = base.length() + 1 + userDir.length() + 1;
|
|
|
- String relative = localizedDir.toString().substring(skip);
|
|
|
- taskController.deleteAsUser(lcacheStatus.user, relative);
|
|
|
- }
|
|
|
- deleteCacheInfoUpdate(lcacheStatus);
|
|
|
- }
|
|
|
- }
|
|
|
- }
|
|
|
-
|
|
|
/*
|
|
|
* Returns the relative path of the dir this cache will be localized in
|
|
|
* relative path that this cache will be localized in. For
|
|
@@ -541,7 +465,7 @@ public class TrackerDistributedCacheManager {
|
|
|
|
|
|
// Increase the size and sub directory count of the cache
|
|
|
// from baseDirSize and baseDirNumberSubDir.
|
|
|
- addCacheInfoUpdate(cacheStatus);
|
|
|
+ baseDirManager.addCacheInfoUpdate(cacheStatus);
|
|
|
|
|
|
LOG.info(String.format("Cached %s as %s",
|
|
|
cache.toString(), cacheStatus.localizedLoadPath));
|
|
@@ -617,28 +541,31 @@ public class TrackerDistributedCacheManager {
|
|
|
}
|
|
|
|
|
|
static class CacheStatus {
|
|
|
- // the local load path of this cache
|
|
|
- Path localizedLoadPath;
|
|
|
-
|
|
|
- //the base dir where the cache lies
|
|
|
- Path localizedBaseDir;
|
|
|
-
|
|
|
- //the size of this cache
|
|
|
- long size;
|
|
|
-
|
|
|
- // number of instances using this cache
|
|
|
- int refcount;
|
|
|
-
|
|
|
- // is it initialized ?
|
|
|
- boolean inited = false;
|
|
|
-
|
|
|
+ //
|
|
|
+ // This field should be accessed under global cachedArchives lock.
|
|
|
+ //
|
|
|
+ int refcount; // number of instances using this cache
|
|
|
+
|
|
|
+ //
|
|
|
+ // The following two fields should be accessed under
|
|
|
+ // individual cacheStatus lock.
|
|
|
+ //
|
|
|
+ long size; //the size of this cache.
|
|
|
+ boolean inited = false; // is it initialized ?
|
|
|
+
|
|
|
+ //
|
|
|
+ // The following five fields are Immutable.
|
|
|
+ //
|
|
|
+
|
|
|
// The sub directory (tasktracker/archive or tasktracker/user/archive),
|
|
|
// under which the file will be localized
|
|
|
Path subDir;
|
|
|
-
|
|
|
// unique string used in the construction of local load path
|
|
|
String uniqueString;
|
|
|
-
|
|
|
+ // the local load path of this cache
|
|
|
+ Path localizedLoadPath;
|
|
|
+ //the base dir where the cache lies
|
|
|
+ Path localizedBaseDir;
|
|
|
// The user that owns the cache entry or null if it is public
|
|
|
final String user;
|
|
|
|
|
@@ -943,10 +870,11 @@ public class TrackerDistributedCacheManager {
|
|
|
return path;
|
|
|
}
|
|
|
|
|
|
+
|
|
|
/**
|
|
|
* A thread to check and cleanup the unused files periodically
|
|
|
*/
|
|
|
- protected class CleanupThread extends Thread {
|
|
|
+ private class CleanupThread extends Thread {
|
|
|
// How often do we check if we need to clean up cache files?
|
|
|
private long cleanUpCheckPeriod = 60000L; // 1 minute
|
|
|
public CleanupThread(Configuration conf) {
|
|
@@ -954,7 +882,6 @@ public class TrackerDistributedCacheManager {
|
|
|
conf.getLong("mapreduce.tasktracker.distributedcache.checkperiod",
|
|
|
cleanUpCheckPeriod);
|
|
|
}
|
|
|
-
|
|
|
private volatile boolean running = true;
|
|
|
|
|
|
public void stopRunning() {
|
|
@@ -967,33 +894,19 @@ public class TrackerDistributedCacheManager {
|
|
|
try {
|
|
|
Thread.sleep(cleanUpCheckPeriod);
|
|
|
baseDirManager.checkAndCleanup();
|
|
|
- } catch (IOException e) {
|
|
|
+ } catch (Exception e) {
|
|
|
LOG.error("Exception in DistributedCache CleanupThread.", e);
|
|
|
- } catch(InterruptedException e) {
|
|
|
- LOG.info("Cleanup...",e);
|
|
|
- //To force us to exit cleanly
|
|
|
- running = false;
|
|
|
- } catch (Throwable t) {
|
|
|
- exitTaskTracker(t);
|
|
|
+ // This thread should keep running and never crash.
|
|
|
}
|
|
|
}
|
|
|
}
|
|
|
-
|
|
|
- /**
|
|
|
- * Exit the task tracker because of a fatal error.
|
|
|
- */
|
|
|
- protected void exitTaskTracker(Throwable t) {
|
|
|
- LOG.fatal("Distributed Cache cleanup thread received runtime exception." +
|
|
|
- " Exiting the TaskTracker", t);
|
|
|
- Runtime.getRuntime().exit(-1);
|
|
|
- }
|
|
|
}
|
|
|
|
|
|
/**
|
|
|
* This class holds properties of each base directories and is responsible
|
|
|
* for clean up unused cache files in base directories.
|
|
|
*/
|
|
|
- protected class BaseDirManager {
|
|
|
+ private class BaseDirManager {
|
|
|
|
|
|
// For holding the properties of each cache directory
|
|
|
private class CacheDir {
|
|
@@ -1072,44 +985,60 @@ public class TrackerDistributedCacheManager {
|
|
|
* and baseDirNumberSubDir. Have to lock lcacheStatus before calling this.
|
|
|
* @param cacheStatus cache status of the cache is deleted
|
|
|
*/
|
|
|
- private void deleteCacheInfoUpdate(CacheStatus cacheStatus) {
|
|
|
+ public void deleteCacheInfoUpdate(CacheStatus cacheStatus) {
|
|
|
if (!cacheStatus.inited) {
|
|
|
// if it is not created yet, do nothing.
|
|
|
return;
|
|
|
}
|
|
|
// decrement the size of the cache from baseDirSize
|
|
|
- synchronized (baseDirProperties) {
|
|
|
- CacheDir cacheDir = baseDirProperties.get(cacheStatus.getBaseDir());
|
|
|
+ synchronized (baseDirManager.properties) {
|
|
|
+ BaseDirManager.CacheDir cacheDir = properties.get(cacheStatus.getBaseDir());
|
|
|
if (cacheDir != null) {
|
|
|
cacheDir.size -= cacheStatus.size;
|
|
|
cacheDir.subdirs--;
|
|
|
} else {
|
|
|
LOG.warn("Cannot find size and number of subdirectories of" +
|
|
|
- " baseDir: " + cacheStatus.getBaseDir());
|
|
|
+ " baseDir: " + cacheStatus.getBaseDir());
|
|
|
}
|
|
|
}
|
|
|
}
|
|
|
-
|
|
|
+
|
|
|
/**
|
|
|
* Update the maps baseDirSize and baseDirNumberSubDir when adding cache.
|
|
|
* Increase the size and sub directory count of the cache from baseDirSize
|
|
|
* and baseDirNumberSubDir. Have to lock lcacheStatus before calling this.
|
|
|
* @param cacheStatus cache status of the cache is added
|
|
|
*/
|
|
|
- private void addCacheInfoUpdate(CacheStatus cacheStatus) {
|
|
|
+ public void addCacheInfoUpdate(CacheStatus cacheStatus) {
|
|
|
long cacheSize = cacheStatus.size;
|
|
|
// decrement the size of the cache from baseDirSize
|
|
|
- synchronized (baseDirProperties) {
|
|
|
- CacheDir cacheDir = baseDirProperties.get(cacheStatus.getBaseDir());
|
|
|
+ synchronized (baseDirManager.properties) {
|
|
|
+ BaseDirManager.CacheDir cacheDir = properties.get(cacheStatus.getBaseDir());
|
|
|
if (cacheDir != null) {
|
|
|
cacheDir.size += cacheSize;
|
|
|
cacheDir.subdirs++;
|
|
|
} else {
|
|
|
- cacheDir = new CacheDir();
|
|
|
+ cacheDir = new BaseDirManager.CacheDir();
|
|
|
cacheDir.size = cacheSize;
|
|
|
cacheDir.subdirs = 1;
|
|
|
- baseDirProperties.put(cacheStatus.getBaseDir(), cacheDir);
|
|
|
+ properties.put(cacheStatus.getBaseDir(), cacheDir);
|
|
|
}
|
|
|
}
|
|
|
}
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * Start the background thread
|
|
|
+ */
|
|
|
+ public void startCleanupThread() {
|
|
|
+ this.cleanupThread.start();
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * Stop the background thread
|
|
|
+ */
|
|
|
+ public void stopCleanupThread() {
|
|
|
+ cleanupThread.stopRunning();
|
|
|
+ cleanupThread.interrupt();
|
|
|
+ }
|
|
|
}
|