|
@@ -68,23 +68,23 @@ public class DistributedCache {
|
|
|
CacheStatus lcacheStatus;
|
|
|
Path localizedPath;
|
|
|
synchronized (cachedArchives) {
|
|
|
- if (!cachedArchives.containsKey(cacheId)) {
|
|
|
+ lcacheStatus = cachedArchives.get(cacheId);
|
|
|
+ if (lcacheStatus == null) {
|
|
|
// was never localized
|
|
|
lcacheStatus = new CacheStatus();
|
|
|
lcacheStatus.currentStatus = false;
|
|
|
- lcacheStatus.refcount = 1;
|
|
|
+ lcacheStatus.refcount = 0;
|
|
|
lcacheStatus.localLoadPath = new Path(baseDir, new Path(cacheId));
|
|
|
cachedArchives.put(cacheId, lcacheStatus);
|
|
|
- } else {
|
|
|
- lcacheStatus = (CacheStatus) cachedArchives.get(cacheId);
|
|
|
- synchronized (lcacheStatus) {
|
|
|
- lcacheStatus.refcount++;
|
|
|
- }
|
|
|
+ }
|
|
|
+
|
|
|
+ synchronized (lcacheStatus) {
|
|
|
+ localizedPath = localizeCache(cache, lcacheStatus, conf, isArchive,
|
|
|
+ md5, currentWorkDir);
|
|
|
+ lcacheStatus.refcount++;
|
|
|
}
|
|
|
}
|
|
|
- synchronized (lcacheStatus) {
|
|
|
- localizedPath = localizeCache(cache, lcacheStatus, conf, isArchive, md5, currentWorkDir);
|
|
|
- }
|
|
|
+
|
|
|
// try deleting stuff if you can
|
|
|
long size = FileUtil.getDU(new File(baseDir.toString()));
|
|
|
// setting the cache size to a default of 1MB
|
|
@@ -125,10 +125,12 @@ public class DistributedCache {
|
|
|
for (Iterator it = cachedArchives.keySet().iterator(); it.hasNext();) {
|
|
|
String cacheId = (String) it.next();
|
|
|
CacheStatus lcacheStatus = (CacheStatus) cachedArchives.get(cacheId);
|
|
|
- if (lcacheStatus.refcount == 0) {
|
|
|
- // delete this cache entry
|
|
|
- FileSystem.getLocal(conf).delete(lcacheStatus.localLoadPath);
|
|
|
- it.remove();
|
|
|
+ synchronized (lcacheStatus) {
|
|
|
+ if (lcacheStatus.refcount == 0) {
|
|
|
+ // delete this cache entry
|
|
|
+ FileSystem.getLocal(conf).delete(lcacheStatus.localLoadPath);
|
|
|
+ it.remove();
|
|
|
+ }
|
|
|
}
|
|
|
}
|
|
|
}
|