|
@@ -202,15 +202,13 @@ public class DistributedCache {
|
|
lcacheStatus = cachedArchives.get(key);
|
|
lcacheStatus = cachedArchives.get(key);
|
|
if (lcacheStatus == null) {
|
|
if (lcacheStatus == null) {
|
|
// was never localized
|
|
// was never localized
|
|
- Path uniqueParentDir =
|
|
|
|
- new Path(subDir, String.valueOf(random.nextLong()));
|
|
|
|
- String cachePath = new Path(uniqueParentDir,
|
|
|
|
- makeRelative(cache, conf)).toString();
|
|
|
|
|
|
+ String cachePath = new Path (subDir,
|
|
|
|
+ new Path(String.valueOf(random.nextLong()),
|
|
|
|
+ makeRelative(cache, conf))).toString();
|
|
Path localPath = lDirAllocator.getLocalPathForWrite(cachePath,
|
|
Path localPath = lDirAllocator.getLocalPathForWrite(cachePath,
|
|
fileStatus.getLen(), conf);
|
|
fileStatus.getLen(), conf);
|
|
- lcacheStatus =
|
|
|
|
- new CacheStatus(new Path(localPath.toString().replace(cachePath, "")),
|
|
|
|
- localPath, uniqueParentDir);
|
|
|
|
|
|
+ lcacheStatus = new CacheStatus(
|
|
|
|
+ new Path(localPath.toString().replace(cachePath, "")), localPath);
|
|
cachedArchives.put(key, lcacheStatus);
|
|
cachedArchives.put(key, lcacheStatus);
|
|
}
|
|
}
|
|
lcacheStatus.refcount++;
|
|
lcacheStatus.refcount++;
|
|
@@ -328,17 +326,17 @@ public class DistributedCache {
|
|
// do the deletion, after releasing the global lock
|
|
// do the deletion, after releasing the global lock
|
|
for (CacheStatus lcacheStatus : deleteSet) {
|
|
for (CacheStatus lcacheStatus : deleteSet) {
|
|
synchronized (lcacheStatus) {
|
|
synchronized (lcacheStatus) {
|
|
- FileSystem.getLocal(conf).delete(lcacheStatus.localizedLoadPath, true);
|
|
|
|
- LOG.info("Deleted path " + lcacheStatus.localizedLoadPath);
|
|
|
|
|
|
+ FileSystem.getLocal(conf).delete(lcacheStatus.localLoadPath, true);
|
|
|
|
+ LOG.info("Deleted path " + lcacheStatus.localLoadPath);
|
|
// decrement the size of the cache from baseDirSize
|
|
// decrement the size of the cache from baseDirSize
|
|
synchronized (baseDirSize) {
|
|
synchronized (baseDirSize) {
|
|
- Long dirSize = baseDirSize.get(lcacheStatus.localizedBaseDir);
|
|
|
|
|
|
+ Long dirSize = baseDirSize.get(lcacheStatus.baseDir);
|
|
if ( dirSize != null ) {
|
|
if ( dirSize != null ) {
|
|
dirSize -= lcacheStatus.size;
|
|
dirSize -= lcacheStatus.size;
|
|
- baseDirSize.put(lcacheStatus.localizedBaseDir, dirSize);
|
|
|
|
|
|
+ baseDirSize.put(lcacheStatus.baseDir, dirSize);
|
|
} else {
|
|
} else {
|
|
LOG.warn("Cannot find record of the baseDir: " +
|
|
LOG.warn("Cannot find record of the baseDir: " +
|
|
- lcacheStatus.localizedBaseDir + " during delete!");
|
|
|
|
|
|
+ lcacheStatus.baseDir + " during delete!");
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}
|
|
@@ -386,13 +384,12 @@ public class DistributedCache {
|
|
// Has to be
|
|
// Has to be
|
|
if (!ifExistsAndFresh(conf, fs, cache, confFileStamp,
|
|
if (!ifExistsAndFresh(conf, fs, cache, confFileStamp,
|
|
cacheStatus, fileStatus)) {
|
|
cacheStatus, fileStatus)) {
|
|
- throw new IOException("Stale cache file: " +
|
|
|
|
- cacheStatus.localizedLoadPath +
|
|
|
|
|
|
+ throw new IOException("Stale cache file: " + cacheStatus.localLoadPath +
|
|
" for cache-file: " + cache);
|
|
" for cache-file: " + cache);
|
|
}
|
|
}
|
|
LOG.info(String.format("Using existing cache of %s->%s",
|
|
LOG.info(String.format("Using existing cache of %s->%s",
|
|
- cache.toString(), cacheStatus.localizedLoadPath));
|
|
|
|
- return cacheStatus.localizedLoadPath;
|
|
|
|
|
|
+ cache.toString(), cacheStatus.localLoadPath));
|
|
|
|
+ return cacheStatus.localLoadPath;
|
|
}
|
|
}
|
|
|
|
|
|
private static void createSymlink(Configuration conf, URI cache,
|
|
private static void createSymlink(Configuration conf, URI cache,
|
|
@@ -407,7 +404,7 @@ public class DistributedCache {
|
|
File flink = new File(link);
|
|
File flink = new File(link);
|
|
if (doSymlink){
|
|
if (doSymlink){
|
|
if (!flink.exists()) {
|
|
if (!flink.exists()) {
|
|
- FileUtil.symLink(cacheStatus.localizedLoadPath.toString(), link);
|
|
|
|
|
|
+ FileUtil.symLink(cacheStatus.localLoadPath.toString(), link);
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}
|
|
@@ -424,15 +421,15 @@ public class DistributedCache {
|
|
FileSystem localFs = FileSystem.getLocal(conf);
|
|
FileSystem localFs = FileSystem.getLocal(conf);
|
|
Path parchive = null;
|
|
Path parchive = null;
|
|
if (isArchive) {
|
|
if (isArchive) {
|
|
- parchive = new Path(cacheStatus.localizedLoadPath,
|
|
|
|
- new Path(cacheStatus.localizedLoadPath.getName()));
|
|
|
|
|
|
+ parchive = new Path(cacheStatus.localLoadPath,
|
|
|
|
+ new Path(cacheStatus.localLoadPath.getName()));
|
|
} else {
|
|
} else {
|
|
- parchive = cacheStatus.localizedLoadPath;
|
|
|
|
|
|
+ parchive = cacheStatus.localLoadPath;
|
|
}
|
|
}
|
|
|
|
|
|
if (!localFs.mkdirs(parchive.getParent())) {
|
|
if (!localFs.mkdirs(parchive.getParent())) {
|
|
throw new IOException("Mkdirs failed to create directory " +
|
|
throw new IOException("Mkdirs failed to create directory " +
|
|
- cacheStatus.localizedLoadPath.toString());
|
|
|
|
|
|
+ cacheStatus.localLoadPath.toString());
|
|
}
|
|
}
|
|
String cacheId = cache.getPath();
|
|
String cacheId = cache.getPath();
|
|
fs.copyToLocalFile(new Path(cacheId), parchive);
|
|
fs.copyToLocalFile(new Path(cacheId), parchive);
|
|
@@ -454,29 +451,26 @@ public class DistributedCache {
|
|
long cacheSize = FileUtil.getDU(new File(parchive.getParent().toString()));
|
|
long cacheSize = FileUtil.getDU(new File(parchive.getParent().toString()));
|
|
cacheStatus.size = cacheSize;
|
|
cacheStatus.size = cacheSize;
|
|
synchronized (baseDirSize) {
|
|
synchronized (baseDirSize) {
|
|
- Long dirSize = baseDirSize.get(cacheStatus.localizedBaseDir);
|
|
|
|
|
|
+ Long dirSize = baseDirSize.get(cacheStatus.baseDir);
|
|
if (dirSize == null) {
|
|
if (dirSize == null) {
|
|
dirSize = Long.valueOf(cacheSize);
|
|
dirSize = Long.valueOf(cacheSize);
|
|
} else {
|
|
} else {
|
|
dirSize += cacheSize;
|
|
dirSize += cacheSize;
|
|
}
|
|
}
|
|
- baseDirSize.put(cacheStatus.localizedBaseDir, dirSize);
|
|
|
|
|
|
+ baseDirSize.put(cacheStatus.baseDir, dirSize);
|
|
}
|
|
}
|
|
|
|
|
|
// do chmod here
|
|
// do chmod here
|
|
try {
|
|
try {
|
|
//Setting recursive permission to grant everyone read and execute
|
|
//Setting recursive permission to grant everyone read and execute
|
|
- Path localDir = new Path(cacheStatus.localizedBaseDir,
|
|
|
|
- cacheStatus.uniqueParentDir);
|
|
|
|
- LOG.info("Doing chmod on localdir :" + localDir);
|
|
|
|
- FileUtil.chmod(localDir.toString(), "ugo+rx", true);
|
|
|
|
|
|
+ FileUtil.chmod(cacheStatus.baseDir.toString(), "ugo+rx",true);
|
|
} catch(InterruptedException e) {
|
|
} catch(InterruptedException e) {
|
|
LOG.warn("Exception in chmod" + e.toString());
|
|
LOG.warn("Exception in chmod" + e.toString());
|
|
}
|
|
}
|
|
|
|
|
|
// update cacheStatus to reflect the newly cached file
|
|
// update cacheStatus to reflect the newly cached file
|
|
cacheStatus.mtime = getTimestamp(conf, cache);
|
|
cacheStatus.mtime = getTimestamp(conf, cache);
|
|
- return cacheStatus.localizedLoadPath;
|
|
|
|
|
|
+ return cacheStatus.localLoadPath;
|
|
}
|
|
}
|
|
|
|
|
|
private static boolean isTarFile(String filename) {
|
|
private static boolean isTarFile(String filename) {
|
|
@@ -859,13 +853,10 @@ public class DistributedCache {
|
|
|
|
|
|
private static class CacheStatus {
|
|
private static class CacheStatus {
|
|
// the local load path of this cache
|
|
// the local load path of this cache
|
|
- Path localizedLoadPath;
|
|
|
|
|
|
+ Path localLoadPath;
|
|
|
|
|
|
//the base dir where the cache lies
|
|
//the base dir where the cache lies
|
|
- Path localizedBaseDir;
|
|
|
|
-
|
|
|
|
- // the unique directory in localizedBaseDir, where the cache lies
|
|
|
|
- Path uniqueParentDir;
|
|
|
|
|
|
+ Path baseDir;
|
|
|
|
|
|
//the size of this cache
|
|
//the size of this cache
|
|
long size;
|
|
long size;
|
|
@@ -879,19 +870,18 @@ public class DistributedCache {
|
|
// is it initialized?
|
|
// is it initialized?
|
|
boolean inited = false;
|
|
boolean inited = false;
|
|
|
|
|
|
- public CacheStatus(Path baseDir, Path localLoadPath, Path uniqueParentDir) {
|
|
|
|
|
|
+ public CacheStatus(Path baseDir, Path localLoadPath) {
|
|
super();
|
|
super();
|
|
- this.localizedLoadPath = localLoadPath;
|
|
|
|
|
|
+ this.localLoadPath = localLoadPath;
|
|
this.refcount = 0;
|
|
this.refcount = 0;
|
|
this.mtime = -1;
|
|
this.mtime = -1;
|
|
- this.localizedBaseDir = baseDir;
|
|
|
|
|
|
+ this.baseDir = baseDir;
|
|
this.size = 0;
|
|
this.size = 0;
|
|
- this.uniqueParentDir = uniqueParentDir;
|
|
|
|
}
|
|
}
|
|
|
|
|
|
// get the base dir for the cache
|
|
// get the base dir for the cache
|
|
Path getBaseDir() {
|
|
Path getBaseDir() {
|
|
- return localizedBaseDir;
|
|
|
|
|
|
+ return baseDir;
|
|
}
|
|
}
|
|
|
|
|
|
// Is it initialized?
|
|
// Is it initialized?
|
|
@@ -915,7 +905,7 @@ public class DistributedCache {
|
|
FileSystem localFs = FileSystem.getLocal(conf);
|
|
FileSystem localFs = FileSystem.getLocal(conf);
|
|
for (Map.Entry<String,CacheStatus> f: cachedArchives.entrySet()) {
|
|
for (Map.Entry<String,CacheStatus> f: cachedArchives.entrySet()) {
|
|
try {
|
|
try {
|
|
- localFs.delete(f.getValue().localizedLoadPath, true);
|
|
|
|
|
|
+ localFs.delete(f.getValue().localLoadPath, true);
|
|
} catch (IOException ie) {
|
|
} catch (IOException ie) {
|
|
LOG.debug("Error cleaning up cache", ie);
|
|
LOG.debug("Error cleaning up cache", ie);
|
|
}
|
|
}
|