|
@@ -124,8 +124,6 @@ public class DistributedCache {
|
|
|
private static final Log LOG =
|
|
|
LogFactory.getLog(DistributedCache.class);
|
|
|
|
|
|
- private static Random random = new Random();
|
|
|
-
|
|
|
/**
|
|
|
* Get the locally cached file or archive; it could either be
|
|
|
* previously cached (and valid) or copy it from the {@link FileSystem} now.
|
|
@@ -157,8 +155,7 @@ public class DistributedCache {
|
|
|
Path currentWorkDir)
|
|
|
throws IOException {
|
|
|
return getLocalCache(cache, conf, baseDir, fileStatus, isArchive,
|
|
|
- confFileStamp, currentWorkDir, true,
|
|
|
- new LocalDirAllocator("mapred.local.dir"));
|
|
|
+ confFileStamp, currentWorkDir, true);
|
|
|
}
|
|
|
/**
|
|
|
* Get the locally cached file or archive; it could either be
|
|
@@ -169,7 +166,7 @@ public class DistributedCache {
|
|
|
* or hostname:port is provided the file is assumed to be in the filesystem
|
|
|
* being used in the Configuration
|
|
|
* @param conf The Confguration file which contains the filesystem
|
|
|
- * @param subDir The sub cache Dir where you want to localize the files/archives
|
|
|
+ * @param baseDir The base cache Dir where you wnat to localize the files/archives
|
|
|
* @param fileStatus The file status on the dfs.
|
|
|
* @param isArchive if the cache is an archive or a file. In case it is an
|
|
|
* archive with a .zip or .jar or .tar or .tgz or .tar.gz extension it will
|
|
@@ -184,59 +181,39 @@ public class DistributedCache {
|
|
|
* @param honorSymLinkConf if this is false, then the symlinks are not
|
|
|
* created even if conf says so (this is required for an optimization in task
|
|
|
* launches
|
|
|
- * @param lDirAllocator LocalDirAllocator of the tracker
|
|
|
* @return the path to directory where the archives are unjarred in case of archives,
|
|
|
* the path to the file where the file is copied locally
|
|
|
* @throws IOException
|
|
|
*/
|
|
|
public static Path getLocalCache(URI cache, Configuration conf,
|
|
|
- Path subDir, FileStatus fileStatus,
|
|
|
+ Path baseDir, FileStatus fileStatus,
|
|
|
boolean isArchive, long confFileStamp,
|
|
|
- Path currentWorkDir, boolean honorSymLinkConf,
|
|
|
- LocalDirAllocator lDirAllocator)
|
|
|
+ Path currentWorkDir, boolean honorSymLinkConf)
|
|
|
throws IOException {
|
|
|
- String key = getKey(cache, conf, confFileStamp);
|
|
|
+ String cacheId = makeRelative(cache, conf);
|
|
|
CacheStatus lcacheStatus;
|
|
|
Path localizedPath;
|
|
|
synchronized (cachedArchives) {
|
|
|
- lcacheStatus = cachedArchives.get(key);
|
|
|
+ lcacheStatus = cachedArchives.get(cacheId);
|
|
|
if (lcacheStatus == null) {
|
|
|
// was never localized
|
|
|
- String cachePath = new Path (subDir,
|
|
|
- new Path(String.valueOf(random.nextLong()),
|
|
|
- makeRelative(cache, conf))).toString();
|
|
|
- Path localPath = lDirAllocator.getLocalPathForWrite(cachePath,
|
|
|
- fileStatus.getLen(), conf);
|
|
|
- lcacheStatus = new CacheStatus(
|
|
|
- new Path(localPath.toString().replace(cachePath, "")), localPath);
|
|
|
- cachedArchives.put(key, lcacheStatus);
|
|
|
+ lcacheStatus = new CacheStatus(baseDir, new Path(baseDir, new Path(cacheId)));
|
|
|
+ cachedArchives.put(cacheId, lcacheStatus);
|
|
|
}
|
|
|
- lcacheStatus.refcount++;
|
|
|
- }
|
|
|
-
|
|
|
- synchronized (lcacheStatus) {
|
|
|
- if (!lcacheStatus.isInited()) {
|
|
|
- localizedPath = localizeCache(conf, cache, confFileStamp, lcacheStatus,
|
|
|
- fileStatus, isArchive);
|
|
|
- lcacheStatus.initComplete();
|
|
|
- } else {
|
|
|
- localizedPath = checkCacheStatusValidity(conf, cache, confFileStamp,
|
|
|
- lcacheStatus, fileStatus, isArchive);
|
|
|
+
|
|
|
+ synchronized (lcacheStatus) {
|
|
|
+ localizedPath = localizeCache(conf, cache, confFileStamp, lcacheStatus,
|
|
|
+ fileStatus, isArchive, currentWorkDir, honorSymLinkConf);
|
|
|
+ lcacheStatus.refcount++;
|
|
|
}
|
|
|
- createSymlink(conf, cache, lcacheStatus, isArchive,
|
|
|
- currentWorkDir, honorSymLinkConf);
|
|
|
}
|
|
|
-
|
|
|
+
|
|
|
// try deleting stuff if you can
|
|
|
long size = 0;
|
|
|
- synchronized (lcacheStatus) {
|
|
|
- synchronized (baseDirSize) {
|
|
|
- Long get = baseDirSize.get(lcacheStatus.getBaseDir());
|
|
|
- if ( get != null ) {
|
|
|
- size = get.longValue();
|
|
|
- } else {
|
|
|
- LOG.warn("Cannot find size of baseDir: " + lcacheStatus.getBaseDir());
|
|
|
- }
|
|
|
+ synchronized (baseDirSize) {
|
|
|
+ Long get = baseDirSize.get(baseDir);
|
|
|
+ if ( get != null ) {
|
|
|
+ size = get.longValue();
|
|
|
}
|
|
|
}
|
|
|
// setting the cache size to a default of 10GB
|
|
@@ -291,52 +268,39 @@ public class DistributedCache {
|
|
|
* is contained in.
|
|
|
* @throws IOException
|
|
|
*/
|
|
|
- public static void releaseCache(URI cache, Configuration conf, long timeStamp)
|
|
|
+ public static void releaseCache(URI cache, Configuration conf)
|
|
|
throws IOException {
|
|
|
- String cacheId = getKey(cache, conf, timeStamp);
|
|
|
+ String cacheId = makeRelative(cache, conf);
|
|
|
synchronized (cachedArchives) {
|
|
|
CacheStatus lcacheStatus = cachedArchives.get(cacheId);
|
|
|
- if (lcacheStatus == null) {
|
|
|
- LOG.warn("Cannot find localized cache: " + cache +
|
|
|
- " (key: " + cacheId + ") in releaseCache!");
|
|
|
+ if (lcacheStatus == null)
|
|
|
return;
|
|
|
+ synchronized (lcacheStatus) {
|
|
|
+ lcacheStatus.refcount--;
|
|
|
}
|
|
|
- lcacheStatus.refcount--;
|
|
|
}
|
|
|
}
|
|
|
|
|
|
// To delete the caches which have a refcount of zero
|
|
|
|
|
|
private static void deleteCache(Configuration conf) throws IOException {
|
|
|
- Set<CacheStatus> deleteSet = new HashSet<CacheStatus>();
|
|
|
// try deleting cache Status with refcount of zero
|
|
|
synchronized (cachedArchives) {
|
|
|
for (Iterator it = cachedArchives.keySet().iterator(); it.hasNext();) {
|
|
|
String cacheId = (String) it.next();
|
|
|
CacheStatus lcacheStatus = cachedArchives.get(cacheId);
|
|
|
- if (lcacheStatus.refcount == 0) {
|
|
|
- // delete this cache entry from the global list
|
|
|
- // and mark the localized file for deletion
|
|
|
- deleteSet.add(lcacheStatus);
|
|
|
- it.remove();
|
|
|
- }
|
|
|
- }
|
|
|
- }
|
|
|
-
|
|
|
- // do the deletion, after releasing the global lock
|
|
|
- for (CacheStatus lcacheStatus : deleteSet) {
|
|
|
- synchronized (lcacheStatus) {
|
|
|
- FileSystem.getLocal(conf).delete(lcacheStatus.localLoadPath, true);
|
|
|
- LOG.info("Deleted path " + lcacheStatus.localLoadPath);
|
|
|
- // decrement the size of the cache from baseDirSize
|
|
|
- synchronized (baseDirSize) {
|
|
|
- Long dirSize = baseDirSize.get(lcacheStatus.baseDir);
|
|
|
- if ( dirSize != null ) {
|
|
|
- dirSize -= lcacheStatus.size;
|
|
|
- baseDirSize.put(lcacheStatus.baseDir, dirSize);
|
|
|
- } else {
|
|
|
- LOG.warn("Cannot find record of the baseDir: " +
|
|
|
- lcacheStatus.baseDir + " during delete!");
|
|
|
+ synchronized (lcacheStatus) {
|
|
|
+ if (lcacheStatus.refcount == 0) {
|
|
|
+ // delete this cache entry
|
|
|
+ FileSystem.getLocal(conf).delete(lcacheStatus.localLoadPath, true);
|
|
|
+ synchronized (baseDirSize) {
|
|
|
+ Long dirSize = baseDirSize.get(lcacheStatus.baseDir);
|
|
|
+ if ( dirSize != null ) {
|
|
|
+ dirSize -= lcacheStatus.size;
|
|
|
+ baseDirSize.put(lcacheStatus.baseDir, dirSize);
|
|
|
+ }
|
|
|
+ }
|
|
|
+ it.remove();
|
|
|
}
|
|
|
}
|
|
|
}
|
|
@@ -369,108 +333,128 @@ public class DistributedCache {
|
|
|
return path;
|
|
|
}
|
|
|
|
|
|
- static String getKey(URI cache, Configuration conf, long timeStamp)
|
|
|
- throws IOException {
|
|
|
- return makeRelative(cache, conf) + String.valueOf(timeStamp);
|
|
|
+ private static Path cacheFilePath(Path p) {
|
|
|
+ return new Path(p, p.getName());
|
|
|
}
|
|
|
|
|
|
- private static Path checkCacheStatusValidity(Configuration conf,
|
|
|
- URI cache, long confFileStamp,
|
|
|
- CacheStatus cacheStatus,
|
|
|
- FileStatus fileStatus,
|
|
|
- boolean isArchive
|
|
|
- ) throws IOException {
|
|
|
- FileSystem fs = FileSystem.get(cache, conf);
|
|
|
- // Has to be
|
|
|
- if (!ifExistsAndFresh(conf, fs, cache, confFileStamp,
|
|
|
- cacheStatus, fileStatus)) {
|
|
|
- throw new IOException("Stale cache file: " + cacheStatus.localLoadPath +
|
|
|
- " for cache-file: " + cache);
|
|
|
- }
|
|
|
- LOG.info(String.format("Using existing cache of %s->%s",
|
|
|
- cache.toString(), cacheStatus.localLoadPath));
|
|
|
- return cacheStatus.localLoadPath;
|
|
|
- }
|
|
|
-
|
|
|
- private static void createSymlink(Configuration conf, URI cache,
|
|
|
- CacheStatus cacheStatus, boolean isArchive,
|
|
|
- Path currentWorkDir, boolean honorSymLinkConf) throws IOException {
|
|
|
- boolean doSymlink = honorSymLinkConf && DistributedCache.getSymlink(conf);
|
|
|
- if(cache.getFragment() == null) {
|
|
|
- doSymlink = false;
|
|
|
- }
|
|
|
- String link =
|
|
|
- currentWorkDir.toString() + Path.SEPARATOR + cache.getFragment();
|
|
|
- File flink = new File(link);
|
|
|
- if (doSymlink){
|
|
|
- if (!flink.exists()) {
|
|
|
- FileUtil.symLink(cacheStatus.localLoadPath.toString(), link);
|
|
|
- }
|
|
|
- }
|
|
|
- }
|
|
|
-
|
|
|
// the method which actually copies the caches locally and unjars/unzips them
|
|
|
// and does chmod for the files
|
|
|
private static Path localizeCache(Configuration conf,
|
|
|
URI cache, long confFileStamp,
|
|
|
CacheStatus cacheStatus,
|
|
|
FileStatus fileStatus,
|
|
|
- boolean isArchive)
|
|
|
+ boolean isArchive,
|
|
|
+ Path currentWorkDir,boolean honorSymLinkConf)
|
|
|
throws IOException {
|
|
|
+ boolean doSymlink = honorSymLinkConf && getSymlink(conf);
|
|
|
+ if(cache.getFragment() == null) {
|
|
|
+ doSymlink = false;
|
|
|
+ }
|
|
|
FileSystem fs = getFileSystem(cache, conf);
|
|
|
- FileSystem localFs = FileSystem.getLocal(conf);
|
|
|
- Path parchive = null;
|
|
|
- if (isArchive) {
|
|
|
- parchive = new Path(cacheStatus.localLoadPath,
|
|
|
- new Path(cacheStatus.localLoadPath.getName()));
|
|
|
+ String link = currentWorkDir.toString() + Path.SEPARATOR + cache.getFragment();
|
|
|
+ File flink = new File(link);
|
|
|
+ if (ifExistsAndFresh(conf, fs, cache, confFileStamp,
|
|
|
+ cacheStatus, fileStatus)) {
|
|
|
+ if (isArchive) {
|
|
|
+ if (doSymlink){
|
|
|
+ if (!flink.exists())
|
|
|
+ FileUtil.symLink(cacheStatus.localLoadPath.toString(),
|
|
|
+ link);
|
|
|
+ }
|
|
|
+ return cacheStatus.localLoadPath;
|
|
|
+ }
|
|
|
+ else {
|
|
|
+ if (doSymlink){
|
|
|
+ if (!flink.exists())
|
|
|
+ FileUtil.symLink(cacheFilePath(cacheStatus.localLoadPath).toString(),
|
|
|
+ link);
|
|
|
+ }
|
|
|
+ return cacheFilePath(cacheStatus.localLoadPath);
|
|
|
+ }
|
|
|
} else {
|
|
|
- parchive = cacheStatus.localLoadPath;
|
|
|
- }
|
|
|
+ // remove the old archive
|
|
|
+ // if the old archive cannot be removed since it is being used by another
|
|
|
+ // job
|
|
|
+ // return null
|
|
|
+ if (cacheStatus.refcount > 1 && (cacheStatus.currentStatus == true))
|
|
|
+ throw new IOException("Cache " + cacheStatus.localLoadPath.toString()
|
|
|
+ + " is in use and cannot be refreshed");
|
|
|
+
|
|
|
+ FileSystem localFs = FileSystem.getLocal(conf);
|
|
|
+ localFs.delete(cacheStatus.localLoadPath, true);
|
|
|
+ synchronized (baseDirSize) {
|
|
|
+ Long dirSize = baseDirSize.get(cacheStatus.baseDir);
|
|
|
+ if ( dirSize != null ) {
|
|
|
+ dirSize -= cacheStatus.size;
|
|
|
+ baseDirSize.put(cacheStatus.baseDir, dirSize);
|
|
|
+ }
|
|
|
+ }
|
|
|
+ Path parchive = new Path(cacheStatus.localLoadPath,
|
|
|
+ new Path(cacheStatus.localLoadPath.getName()));
|
|
|
+
|
|
|
+ if (!localFs.mkdirs(cacheStatus.localLoadPath)) {
|
|
|
+ throw new IOException("Mkdirs failed to create directory " +
|
|
|
+ cacheStatus.localLoadPath.toString());
|
|
|
+ }
|
|
|
|
|
|
- if (!localFs.mkdirs(parchive.getParent())) {
|
|
|
- throw new IOException("Mkdirs failed to create directory " +
|
|
|
- cacheStatus.localLoadPath.toString());
|
|
|
- }
|
|
|
- String cacheId = cache.getPath();
|
|
|
- fs.copyToLocalFile(new Path(cacheId), parchive);
|
|
|
- if (isArchive) {
|
|
|
- String tmpArchive = parchive.toString().toLowerCase();
|
|
|
- File srcFile = new File(parchive.toString());
|
|
|
- File destDir = new File(parchive.getParent().toString());
|
|
|
- if (tmpArchive.endsWith(".jar")) {
|
|
|
- RunJar.unJar(srcFile, destDir);
|
|
|
- } else if (tmpArchive.endsWith(".zip")) {
|
|
|
- FileUtil.unZip(srcFile, destDir);
|
|
|
- } else if (isTarFile(tmpArchive)) {
|
|
|
- FileUtil.unTar(srcFile, destDir);
|
|
|
+ String cacheId = cache.getPath();
|
|
|
+ fs.copyToLocalFile(new Path(cacheId), parchive);
|
|
|
+ if (isArchive) {
|
|
|
+ String tmpArchive = parchive.toString().toLowerCase();
|
|
|
+ File srcFile = new File(parchive.toString());
|
|
|
+ File destDir = new File(parchive.getParent().toString());
|
|
|
+ if (tmpArchive.endsWith(".jar")) {
|
|
|
+ RunJar.unJar(srcFile, destDir);
|
|
|
+ } else if (tmpArchive.endsWith(".zip")) {
|
|
|
+ FileUtil.unZip(srcFile, destDir);
|
|
|
+ } else if (isTarFile(tmpArchive)) {
|
|
|
+ FileUtil.unTar(srcFile, destDir);
|
|
|
+ }
|
|
|
+ // else will not do anyhting
|
|
|
+ // and copy the file into the dir as it is
|
|
|
}
|
|
|
- // else will not do anyhting
|
|
|
- // and copy the file into the dir as it is
|
|
|
- }
|
|
|
|
|
|
- long cacheSize = FileUtil.getDU(new File(parchive.getParent().toString()));
|
|
|
- cacheStatus.size = cacheSize;
|
|
|
- synchronized (baseDirSize) {
|
|
|
- Long dirSize = baseDirSize.get(cacheStatus.baseDir);
|
|
|
- if (dirSize == null) {
|
|
|
- dirSize = Long.valueOf(cacheSize);
|
|
|
- } else {
|
|
|
- dirSize += cacheSize;
|
|
|
- }
|
|
|
- baseDirSize.put(cacheStatus.baseDir, dirSize);
|
|
|
- }
|
|
|
+ long cacheSize = FileUtil.getDU(new File(parchive.getParent().toString()));
|
|
|
+ cacheStatus.size = cacheSize;
|
|
|
+ synchronized (baseDirSize) {
|
|
|
+ Long dirSize = baseDirSize.get(cacheStatus.baseDir);
|
|
|
+ if( dirSize == null ) {
|
|
|
+ dirSize = Long.valueOf(cacheSize);
|
|
|
+ } else {
|
|
|
+ dirSize += cacheSize;
|
|
|
+ }
|
|
|
+ baseDirSize.put(cacheStatus.baseDir, dirSize);
|
|
|
+ }
|
|
|
|
|
|
- // do chmod here
|
|
|
- try {
|
|
|
- //Setting recursive permission to grant everyone read and execute
|
|
|
- FileUtil.chmod(cacheStatus.baseDir.toString(), "ugo+rx",true);
|
|
|
- } catch(InterruptedException e) {
|
|
|
+ // do chmod here
|
|
|
+ try {
|
|
|
+ //Setting recursive permission to grant everyone read and execute
|
|
|
+ FileUtil.chmod(cacheStatus.baseDir.toString(), "ugo+rx",true);
|
|
|
+ } catch(InterruptedException e) {
|
|
|
LOG.warn("Exception in chmod" + e.toString());
|
|
|
- }
|
|
|
+ }
|
|
|
|
|
|
- // update cacheStatus to reflect the newly cached file
|
|
|
- cacheStatus.mtime = getTimestamp(conf, cache);
|
|
|
- return cacheStatus.localLoadPath;
|
|
|
+ // update cacheStatus to reflect the newly cached file
|
|
|
+ cacheStatus.currentStatus = true;
|
|
|
+ cacheStatus.mtime = getTimestamp(conf, cache);
|
|
|
+ }
|
|
|
+
|
|
|
+ if (isArchive){
|
|
|
+ if (doSymlink){
|
|
|
+ if (!flink.exists())
|
|
|
+ FileUtil.symLink(cacheStatus.localLoadPath.toString(),
|
|
|
+ link);
|
|
|
+ }
|
|
|
+ return cacheStatus.localLoadPath;
|
|
|
+ }
|
|
|
+ else {
|
|
|
+ if (doSymlink){
|
|
|
+ if (!flink.exists())
|
|
|
+ FileUtil.symLink(cacheFilePath(cacheStatus.localLoadPath).toString(),
|
|
|
+ link);
|
|
|
+ }
|
|
|
+ return cacheFilePath(cacheStatus.localLoadPath);
|
|
|
+ }
|
|
|
}
|
|
|
|
|
|
private static boolean isTarFile(String filename) {
|
|
@@ -485,22 +469,27 @@ public class DistributedCache {
|
|
|
FileStatus fileStatus)
|
|
|
throws IOException {
|
|
|
// check for existence of the cache
|
|
|
- long dfsFileStamp;
|
|
|
- if (fileStatus != null) {
|
|
|
- dfsFileStamp = fileStatus.getModificationTime();
|
|
|
+ if (lcacheStatus.currentStatus == false) {
|
|
|
+ return false;
|
|
|
} else {
|
|
|
- dfsFileStamp = getTimestamp(conf, cache);
|
|
|
- }
|
|
|
+ long dfsFileStamp;
|
|
|
+ if (fileStatus != null) {
|
|
|
+ dfsFileStamp = fileStatus.getModificationTime();
|
|
|
+ } else {
|
|
|
+ dfsFileStamp = getTimestamp(conf, cache);
|
|
|
+ }
|
|
|
|
|
|
- // ensure that the file on hdfs hasn't been modified since the job started
|
|
|
- if (dfsFileStamp != confFileStamp) {
|
|
|
- LOG.fatal("File: " + cache + " has changed on HDFS since job started");
|
|
|
- throw new IOException("File: " + cache +
|
|
|
- " has changed on HDFS since job started");
|
|
|
- }
|
|
|
+ // ensure that the file on hdfs hasn't been modified since the job started
|
|
|
+ if (dfsFileStamp != confFileStamp) {
|
|
|
+ LOG.fatal("File: " + cache + " has changed on HDFS since job started");
|
|
|
+ throw new IOException("File: " + cache +
|
|
|
+ " has changed on HDFS since job started");
|
|
|
+ }
|
|
|
|
|
|
- if (dfsFileStamp != lcacheStatus.mtime) {
|
|
|
- return false;
|
|
|
+ if (dfsFileStamp != lcacheStatus.mtime) {
|
|
|
+ // needs refreshing
|
|
|
+ return false;
|
|
|
+ }
|
|
|
}
|
|
|
|
|
|
return true;
|
|
@@ -852,6 +841,9 @@ public class DistributedCache {
|
|
|
}
|
|
|
|
|
|
private static class CacheStatus {
|
|
|
+ // false, not loaded yet, true is loaded
|
|
|
+ boolean currentStatus;
|
|
|
+
|
|
|
// the local load path of this cache
|
|
|
Path localLoadPath;
|
|
|
|
|
@@ -866,33 +858,16 @@ public class DistributedCache {
|
|
|
|
|
|
// the cache-file modification time
|
|
|
long mtime;
|
|
|
-
|
|
|
- // is it initialized?
|
|
|
- boolean inited = false;
|
|
|
|
|
|
public CacheStatus(Path baseDir, Path localLoadPath) {
|
|
|
super();
|
|
|
+ this.currentStatus = false;
|
|
|
this.localLoadPath = localLoadPath;
|
|
|
this.refcount = 0;
|
|
|
this.mtime = -1;
|
|
|
this.baseDir = baseDir;
|
|
|
this.size = 0;
|
|
|
}
|
|
|
-
|
|
|
- // get the base dir for the cache
|
|
|
- Path getBaseDir() {
|
|
|
- return baseDir;
|
|
|
- }
|
|
|
-
|
|
|
- // Is it initialized?
|
|
|
- boolean isInited() {
|
|
|
- return inited;
|
|
|
- }
|
|
|
-
|
|
|
- // mark it as initalized
|
|
|
- void initComplete() {
|
|
|
- inited = true;
|
|
|
- }
|
|
|
}
|
|
|
|
|
|
/**
|