|
@@ -20,8 +20,11 @@ package org.apache.hadoop.filecache;
|
|
|
import java.io.File;
|
|
|
import java.io.IOException;
|
|
|
import java.net.URI;
|
|
|
+import java.util.HashSet;
|
|
|
import java.util.Iterator;
|
|
|
import java.util.Map;
|
|
|
+import java.util.Random;
|
|
|
+import java.util.Set;
|
|
|
import java.util.TreeMap;
|
|
|
|
|
|
import org.apache.commons.logging.Log;
|
|
@@ -30,6 +33,7 @@ import org.apache.hadoop.conf.Configuration;
|
|
|
import org.apache.hadoop.fs.FileStatus;
|
|
|
import org.apache.hadoop.fs.FileSystem;
|
|
|
import org.apache.hadoop.fs.FileUtil;
|
|
|
+import org.apache.hadoop.fs.LocalDirAllocator;
|
|
|
import org.apache.hadoop.fs.LocalFileSystem;
|
|
|
import org.apache.hadoop.fs.Path;
|
|
|
import org.apache.hadoop.util.RunJar;
|
|
@@ -57,9 +61,17 @@ public class TrackerDistributedCacheManager {
|
|
|
LogFactory.getLog(TrackerDistributedCacheManager.class);
|
|
|
|
|
|
private final LocalFileSystem localFs;
|
|
|
+
|
|
|
+ private LocalDirAllocator lDirAllocator;
|
|
|
+
|
|
|
+ private Configuration trackerConf;
|
|
|
+
|
|
|
+ private Random random = new Random();
|
|
|
|
|
|
public TrackerDistributedCacheManager(Configuration conf) throws IOException {
|
|
|
this.localFs = FileSystem.getLocal(conf);
|
|
|
+ this.trackerConf = conf;
|
|
|
+ this.lDirAllocator = new LocalDirAllocator("mapred.local.dir");
|
|
|
}
|
|
|
|
|
|
/**
|
|
@@ -69,7 +81,7 @@ public class TrackerDistributedCacheManager {
|
|
|
* @param cache the cache to be localized, this should be specified as
|
|
|
* new URI(scheme://scheme-specific-part/absolute_path_to_file#LINKNAME).
|
|
|
* @param conf The Configuration file which contains the filesystem
|
|
|
- * @param baseDir The base cache Dir where you wnat to localize the
|
|
|
+ * @param subDir The base cache subDir where you want to localize the
|
|
|
* files/archives
|
|
|
* @param fileStatus The file status on the dfs.
|
|
|
* @param isArchive if the cache is an archive or a file. In case it is an
|
|
@@ -92,35 +104,55 @@ public class TrackerDistributedCacheManager {
|
|
|
* @throws IOException
|
|
|
*/
|
|
|
Path getLocalCache(URI cache, Configuration conf,
|
|
|
- Path baseDir, FileStatus fileStatus,
|
|
|
+ String subDir, FileStatus fileStatus,
|
|
|
boolean isArchive, long confFileStamp,
|
|
|
Path currentWorkDir, boolean honorSymLinkConf)
|
|
|
throws IOException {
|
|
|
- String cacheId = makeRelative(cache, conf);
|
|
|
+ String key = getKey(cache, conf, confFileStamp);
|
|
|
CacheStatus lcacheStatus;
|
|
|
- Path localizedPath;
|
|
|
+ Path localizedPath = null;
|
|
|
synchronized (cachedArchives) {
|
|
|
- lcacheStatus = cachedArchives.get(cacheId);
|
|
|
+ lcacheStatus = cachedArchives.get(key);
|
|
|
if (lcacheStatus == null) {
|
|
|
// was never localized
|
|
|
- lcacheStatus = new CacheStatus(baseDir,
|
|
|
- new Path(baseDir, new Path(cacheId)));
|
|
|
- cachedArchives.put(cacheId, lcacheStatus);
|
|
|
+ String cachePath = new Path (subDir,
|
|
|
+ new Path(String.valueOf(random.nextLong()),
|
|
|
+ makeRelative(cache, conf))).toString();
|
|
|
+ Path localPath = lDirAllocator.getLocalPathForWrite(cachePath,
|
|
|
+ fileStatus.getLen(), trackerConf);
|
|
|
+ lcacheStatus = new CacheStatus(
|
|
|
+ new Path(localPath.toString().replace(cachePath, "")), localPath);
|
|
|
+ cachedArchives.put(key, lcacheStatus);
|
|
|
}
|
|
|
|
|
|
- synchronized (lcacheStatus) {
|
|
|
+ //mark the cache for use.
|
|
|
+ lcacheStatus.refcount++;
|
|
|
+ }
|
|
|
+
|
|
|
+ // do the localization, after releasing the global lock
|
|
|
+ synchronized (lcacheStatus) {
|
|
|
+ if (!lcacheStatus.isInited()) {
|
|
|
localizedPath = localizeCache(conf, cache, confFileStamp, lcacheStatus,
|
|
|
- fileStatus, isArchive, currentWorkDir, honorSymLinkConf);
|
|
|
- lcacheStatus.refcount++;
|
|
|
+ fileStatus, isArchive);
|
|
|
+ lcacheStatus.initComplete();
|
|
|
+ } else {
|
|
|
+ localizedPath = checkCacheStatusValidity(conf, cache, confFileStamp,
|
|
|
+ lcacheStatus, fileStatus, isArchive);
|
|
|
}
|
|
|
+ createSymlink(conf, cache, lcacheStatus, isArchive,
|
|
|
+ currentWorkDir, honorSymLinkConf);
|
|
|
}
|
|
|
|
|
|
// try deleting stuff if you can
|
|
|
long size = 0;
|
|
|
- synchronized (baseDirSize) {
|
|
|
- Long get = baseDirSize.get(baseDir);
|
|
|
- if ( get != null ) {
|
|
|
- size = get.longValue();
|
|
|
+ synchronized (lcacheStatus) {
|
|
|
+ synchronized (baseDirSize) {
|
|
|
+ Long get = baseDirSize.get(lcacheStatus.getBaseDir());
|
|
|
+ if ( get != null ) {
|
|
|
+ size = get.longValue();
|
|
|
+ } else {
|
|
|
+ LOG.warn("Cannot find size of baseDir: " + lcacheStatus.getBaseDir());
|
|
|
+ }
|
|
|
}
|
|
|
}
|
|
|
// setting the cache size to a default of 10GB
|
|
@@ -140,40 +172,58 @@ public class TrackerDistributedCacheManager {
|
|
|
* is contained in.
|
|
|
* @throws IOException
|
|
|
*/
|
|
|
- void releaseCache(URI cache, Configuration conf)
|
|
|
+ void releaseCache(URI cache, Configuration conf, long timeStamp)
|
|
|
throws IOException {
|
|
|
- String cacheId = makeRelative(cache, conf);
|
|
|
+ String key = getKey(cache, conf, timeStamp);
|
|
|
synchronized (cachedArchives) {
|
|
|
- CacheStatus lcacheStatus = cachedArchives.get(cacheId);
|
|
|
- if (lcacheStatus == null)
|
|
|
+ CacheStatus lcacheStatus = cachedArchives.get(key);
|
|
|
+ if (lcacheStatus == null) {
|
|
|
+ LOG.warn("Cannot find localized cache: " + cache +
|
|
|
+ " (key: " + key + ") in releaseCache!");
|
|
|
return;
|
|
|
- synchronized (lcacheStatus) {
|
|
|
- lcacheStatus.refcount--;
|
|
|
}
|
|
|
+
|
|
|
+ // decrement ref count
|
|
|
+ lcacheStatus.refcount--;
|
|
|
}
|
|
|
}
|
|
|
|
|
|
// To delete the caches which have a refcount of zero
|
|
|
|
|
|
private void deleteCache(Configuration conf) throws IOException {
|
|
|
+ Set<CacheStatus> deleteSet = new HashSet<CacheStatus>();
|
|
|
// try deleting cache Status with refcount of zero
|
|
|
synchronized (cachedArchives) {
|
|
|
for (Iterator<String> it = cachedArchives.keySet().iterator();
|
|
|
it.hasNext();) {
|
|
|
String cacheId = it.next();
|
|
|
CacheStatus lcacheStatus = cachedArchives.get(cacheId);
|
|
|
- synchronized (lcacheStatus) {
|
|
|
- if (lcacheStatus.refcount == 0) {
|
|
|
- // delete this cache entry
|
|
|
- FileSystem.getLocal(conf).delete(lcacheStatus.localLoadPath, true);
|
|
|
- synchronized (baseDirSize) {
|
|
|
- Long dirSize = baseDirSize.get(lcacheStatus.baseDir);
|
|
|
- if ( dirSize != null ) {
|
|
|
- dirSize -= lcacheStatus.size;
|
|
|
- baseDirSize.put(lcacheStatus.baseDir, dirSize);
|
|
|
- }
|
|
|
- }
|
|
|
- it.remove();
|
|
|
+
|
|
|
+ // if reference count is zero
|
|
|
+ // mark the cache for deletion
|
|
|
+ if (lcacheStatus.refcount == 0) {
|
|
|
+ // delete this cache entry from the global list
|
|
|
+ // and mark the localized file for deletion
|
|
|
+ deleteSet.add(lcacheStatus);
|
|
|
+ it.remove();
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ // do the deletion, after releasing the global lock
|
|
|
+ for (CacheStatus lcacheStatus : deleteSet) {
|
|
|
+ synchronized (lcacheStatus) {
|
|
|
+ FileSystem.getLocal(conf).delete(lcacheStatus.localLoadPath, true);
|
|
|
+ LOG.info("Deleted path " + lcacheStatus.localLoadPath);
|
|
|
+ // decrement the size of the cache from baseDirSize
|
|
|
+ synchronized (baseDirSize) {
|
|
|
+ Long dirSize = baseDirSize.get(lcacheStatus.baseDir);
|
|
|
+ if ( dirSize != null ) {
|
|
|
+ dirSize -= lcacheStatus.size;
|
|
|
+ baseDirSize.put(lcacheStatus.baseDir, dirSize);
|
|
|
+ } else {
|
|
|
+ LOG.warn("Cannot find record of the baseDir: " +
|
|
|
+ lcacheStatus.baseDir + " during delete!");
|
|
|
}
|
|
|
}
|
|
|
}
|
|
@@ -206,144 +256,110 @@ public class TrackerDistributedCacheManager {
|
|
|
return path;
|
|
|
}
|
|
|
|
|
|
- private Path cacheFilePath(Path p) {
|
|
|
- return new Path(p, p.getName());
|
|
|
+ private Path checkCacheStatusValidity(Configuration conf,
|
|
|
+ URI cache, long confFileStamp,
|
|
|
+ CacheStatus cacheStatus,
|
|
|
+ FileStatus fileStatus,
|
|
|
+ boolean isArchive
|
|
|
+ ) throws IOException {
|
|
|
+ FileSystem fs = FileSystem.get(cache, conf);
|
|
|
+ // Has to be
|
|
|
+ if (!ifExistsAndFresh(conf, fs, cache, confFileStamp,
|
|
|
+ cacheStatus, fileStatus)) {
|
|
|
+ throw new IOException("Stale cache file: " + cacheStatus.localLoadPath +
|
|
|
+ " for cache-file: " + cache);
|
|
|
+ }
|
|
|
+ LOG.info(String.format("Using existing cache of %s->%s",
|
|
|
+ cache.toString(), cacheStatus.localLoadPath));
|
|
|
+ return cacheStatus.localLoadPath;
|
|
|
}
|
|
|
|
|
|
- // the method which actually copies the caches locally and unjars/unzips them
|
|
|
- // and does chmod for the files
|
|
|
- private Path localizeCache(Configuration conf,
|
|
|
- URI cache, long confFileStamp,
|
|
|
- CacheStatus cacheStatus,
|
|
|
- FileStatus fileStatus,
|
|
|
- boolean isArchive,
|
|
|
- Path currentWorkDir,
|
|
|
- boolean honorSymLinkConf)
|
|
|
- throws IOException {
|
|
|
+ private void createSymlink(Configuration conf, URI cache,
|
|
|
+ CacheStatus cacheStatus, boolean isArchive,
|
|
|
+ Path currentWorkDir, boolean honorSymLinkConf) throws IOException {
|
|
|
boolean doSymlink = honorSymLinkConf && DistributedCache.getSymlink(conf);
|
|
|
if(cache.getFragment() == null) {
|
|
|
doSymlink = false;
|
|
|
}
|
|
|
- FileSystem fs = FileSystem.get(cache, conf);
|
|
|
+
|
|
|
String link =
|
|
|
currentWorkDir.toString() + Path.SEPARATOR + cache.getFragment();
|
|
|
File flink = new File(link);
|
|
|
- if (ifExistsAndFresh(conf, fs, cache, confFileStamp,
|
|
|
- cacheStatus, fileStatus)) {
|
|
|
- LOG.info(String.format("Using existing cache of %s->%s",
|
|
|
- cache.toString(), cacheStatus.localLoadPath));
|
|
|
- if (isArchive) {
|
|
|
- if (doSymlink){
|
|
|
- if (!flink.exists())
|
|
|
- FileUtil.symLink(cacheStatus.localLoadPath.toString(),
|
|
|
- link);
|
|
|
- }
|
|
|
-
|
|
|
- return cacheStatus.localLoadPath;
|
|
|
- }
|
|
|
- else {
|
|
|
- if (doSymlink){
|
|
|
- if (!flink.exists())
|
|
|
- FileUtil.symLink(
|
|
|
- cacheFilePath(cacheStatus.localLoadPath).toString(), link);
|
|
|
- }
|
|
|
- return cacheFilePath(cacheStatus.localLoadPath);
|
|
|
- }
|
|
|
- } else {
|
|
|
-
|
|
|
- // remove the old archive
|
|
|
- // if the old archive cannot be removed since it is being used by another
|
|
|
- // job
|
|
|
- // return null
|
|
|
- if (cacheStatus.refcount > 1 && (cacheStatus.currentStatus == true))
|
|
|
- throw new IOException("Cache " + cacheStatus.localLoadPath.toString()
|
|
|
- + " is in use and cannot be refreshed");
|
|
|
-
|
|
|
- FileSystem localFs = FileSystem.getLocal(conf);
|
|
|
- localFs.delete(cacheStatus.localLoadPath, true);
|
|
|
- synchronized (baseDirSize) {
|
|
|
- Long dirSize = baseDirSize.get(cacheStatus.baseDir);
|
|
|
- if ( dirSize != null ) {
|
|
|
- dirSize -= cacheStatus.size;
|
|
|
- baseDirSize.put(cacheStatus.baseDir, dirSize);
|
|
|
- }
|
|
|
- }
|
|
|
- Path parchive = new Path(cacheStatus.localLoadPath,
|
|
|
- new Path(cacheStatus.localLoadPath.getName()));
|
|
|
-
|
|
|
- if (!localFs.mkdirs(cacheStatus.localLoadPath)) {
|
|
|
- throw new IOException("Mkdirs failed to create directory " +
|
|
|
- cacheStatus.localLoadPath.toString());
|
|
|
+ if (doSymlink){
|
|
|
+ if (!flink.exists()) {
|
|
|
+ FileUtil.symLink(cacheStatus.localLoadPath.toString(), link);
|
|
|
}
|
|
|
+ }
|
|
|
+ }
|
|
|
|
|
|
- String cacheId = cache.getPath();
|
|
|
- fs.copyToLocalFile(new Path(cacheId), parchive);
|
|
|
- if (isArchive) {
|
|
|
- String tmpArchive = parchive.toString().toLowerCase();
|
|
|
- File srcFile = new File(parchive.toString());
|
|
|
- File destDir = new File(parchive.getParent().toString());
|
|
|
- LOG.info(String.format("Extracting %s to %s",
|
|
|
- srcFile.toString(), destDir.toString()));
|
|
|
- if (tmpArchive.endsWith(".jar")) {
|
|
|
- RunJar.unJar(srcFile, destDir);
|
|
|
- } else if (tmpArchive.endsWith(".zip")) {
|
|
|
- FileUtil.unZip(srcFile, destDir);
|
|
|
- } else if (isTarFile(tmpArchive)) {
|
|
|
- FileUtil.unTar(srcFile, destDir);
|
|
|
- } else {
|
|
|
- LOG.warn(String.format(
|
|
|
- "Cache file %s specified as archive, but not valid extension.",
|
|
|
+ //the method which actually copies the caches locally and unjars/unzips them
|
|
|
+ // and does chmod for the files
|
|
|
+ private Path localizeCache(Configuration conf,
|
|
|
+ URI cache, long confFileStamp,
|
|
|
+ CacheStatus cacheStatus,
|
|
|
+ FileStatus fileStatus,
|
|
|
+ boolean isArchive)
|
|
|
+ throws IOException {
|
|
|
+ FileSystem fs = FileSystem.get(cache, conf);
|
|
|
+ FileSystem localFs = FileSystem.getLocal(conf);
|
|
|
+ Path parchive = null;
|
|
|
+ if (isArchive) {
|
|
|
+ parchive = new Path(cacheStatus.localLoadPath,
|
|
|
+ new Path(cacheStatus.localLoadPath.getName()));
|
|
|
+ } else {
|
|
|
+ parchive = cacheStatus.localLoadPath;
|
|
|
+ }
|
|
|
+ if (!localFs.mkdirs(parchive.getParent())) {
|
|
|
+ throw new IOException("Mkdirs failed to create directory " +
|
|
|
+ cacheStatus.localLoadPath.toString());
|
|
|
+ }
|
|
|
+ String cacheId = cache.getPath();
|
|
|
+ fs.copyToLocalFile(new Path(cacheId), parchive);
|
|
|
+ if (isArchive) {
|
|
|
+ String tmpArchive = parchive.toString().toLowerCase();
|
|
|
+ File srcFile = new File(parchive.toString());
|
|
|
+ File destDir = new File(parchive.getParent().toString());
|
|
|
+ LOG.info(String.format("Extracting %s to %s",
|
|
|
+ srcFile.toString(), destDir.toString()));
|
|
|
+ if (tmpArchive.endsWith(".jar")) {
|
|
|
+ RunJar.unJar(srcFile, destDir);
|
|
|
+ } else if (tmpArchive.endsWith(".zip")) {
|
|
|
+ FileUtil.unZip(srcFile, destDir);
|
|
|
+ } else if (isTarFile(tmpArchive)) {
|
|
|
+ FileUtil.unTar(srcFile, destDir);
|
|
|
+ } else {
|
|
|
+ LOG.warn(String.format(
|
|
|
+ "Cache file %s specified as archive, but not valid extension.",
|
|
|
srcFile.toString()));
|
|
|
- // else will not do anyhting
|
|
|
- // and copy the file into the dir as it is
|
|
|
- }
|
|
|
- }
|
|
|
-
|
|
|
- long cacheSize =
|
|
|
- FileUtil.getDU(new File(parchive.getParent().toString()));
|
|
|
- cacheStatus.size = cacheSize;
|
|
|
- synchronized (baseDirSize) {
|
|
|
- Long dirSize = baseDirSize.get(cacheStatus.baseDir);
|
|
|
- if( dirSize == null ) {
|
|
|
- dirSize = Long.valueOf(cacheSize);
|
|
|
- } else {
|
|
|
- dirSize += cacheSize;
|
|
|
- }
|
|
|
- baseDirSize.put(cacheStatus.baseDir, dirSize);
|
|
|
+ // else will not do anyhting
|
|
|
+ // and copy the file into the dir as it is
|
|
|
}
|
|
|
-
|
|
|
- // do chmod here
|
|
|
- try {
|
|
|
- //Setting recursive permission to grant everyone read and execute
|
|
|
- FileUtil.chmod(cacheStatus.baseDir.toString(), "ugo+rx",true);
|
|
|
- } catch(InterruptedException e) {
|
|
|
- LOG.warn("Exception in chmod" + e.toString());
|
|
|
- }
|
|
|
-
|
|
|
- // update cacheStatus to reflect the newly cached file
|
|
|
- cacheStatus.currentStatus = true;
|
|
|
- cacheStatus.mtime = DistributedCache.getTimestamp(conf, cache);
|
|
|
-
|
|
|
- LOG.info(String.format("Cached %s as %s",
|
|
|
- cache.toString(), cacheStatus.localLoadPath));
|
|
|
}
|
|
|
-
|
|
|
- if (isArchive){
|
|
|
- if (doSymlink){
|
|
|
- if (!flink.exists())
|
|
|
- FileUtil.symLink(cacheStatus.localLoadPath.toString(),
|
|
|
- link);
|
|
|
+ long cacheSize =
|
|
|
+ FileUtil.getDU(new File(parchive.getParent().toString()));
|
|
|
+ cacheStatus.size = cacheSize;
|
|
|
+ synchronized (baseDirSize) {
|
|
|
+ Long dirSize = baseDirSize.get(cacheStatus.baseDir);
|
|
|
+ if( dirSize == null ) {
|
|
|
+ dirSize = Long.valueOf(cacheSize);
|
|
|
+ } else {
|
|
|
+ dirSize += cacheSize;
|
|
|
}
|
|
|
- return cacheStatus.localLoadPath;
|
|
|
+ baseDirSize.put(cacheStatus.baseDir, dirSize);
|
|
|
}
|
|
|
- else {
|
|
|
- if (doSymlink){
|
|
|
- if (!flink.exists())
|
|
|
- FileUtil.symLink(cacheFilePath(cacheStatus.localLoadPath).toString(),
|
|
|
- link);
|
|
|
- }
|
|
|
- return cacheFilePath(cacheStatus.localLoadPath);
|
|
|
+ // do chmod here
|
|
|
+ try {
|
|
|
+ //Setting recursive permission to grant everyone read and execute
|
|
|
+ FileUtil.chmod(cacheStatus.baseDir.toString(), "ugo+rx",true);
|
|
|
+ } catch(InterruptedException e) {
|
|
|
+ LOG.warn("Exception in chmod" + e.toString());
|
|
|
}
|
|
|
+ // update cacheStatus to reflect the newly cached file
|
|
|
+ cacheStatus.mtime = DistributedCache.getTimestamp(conf, cache);
|
|
|
+
|
|
|
+ LOG.info(String.format("Cached %s as %s",
|
|
|
+ cache.toString(), cacheStatus.localLoadPath));
|
|
|
+ return cacheStatus.localLoadPath;
|
|
|
}
|
|
|
|
|
|
private static boolean isTarFile(String filename) {
|
|
@@ -357,33 +373,31 @@ public class TrackerDistributedCacheManager {
|
|
|
CacheStatus lcacheStatus,
|
|
|
FileStatus fileStatus)
|
|
|
throws IOException {
|
|
|
- // check for existence of the cache
|
|
|
- if (lcacheStatus.currentStatus == false) {
|
|
|
- return false;
|
|
|
+ long dfsFileStamp;
|
|
|
+ if (fileStatus != null) {
|
|
|
+ dfsFileStamp = fileStatus.getModificationTime();
|
|
|
} else {
|
|
|
- long dfsFileStamp;
|
|
|
- if (fileStatus != null) {
|
|
|
- dfsFileStamp = fileStatus.getModificationTime();
|
|
|
- } else {
|
|
|
- dfsFileStamp = DistributedCache.getTimestamp(conf, cache);
|
|
|
- }
|
|
|
-
|
|
|
- // ensure that the file on hdfs hasn't been modified since the job started
|
|
|
- if (dfsFileStamp != confFileStamp) {
|
|
|
- LOG.fatal("File: " + cache + " has changed on HDFS since job started");
|
|
|
- throw new IOException("File: " + cache +
|
|
|
- " has changed on HDFS since job started");
|
|
|
- }
|
|
|
+ dfsFileStamp = DistributedCache.getTimestamp(conf, cache);
|
|
|
+ }
|
|
|
|
|
|
- if (dfsFileStamp != lcacheStatus.mtime) {
|
|
|
- // needs refreshing
|
|
|
- return false;
|
|
|
- }
|
|
|
+ // ensure that the file on hdfs hasn't been modified since the job started
|
|
|
+ if (dfsFileStamp != confFileStamp) {
|
|
|
+ LOG.fatal("File: " + cache + " has changed on HDFS since job started");
|
|
|
+ throw new IOException("File: " + cache +
|
|
|
+ " has changed on HDFS since job started");
|
|
|
}
|
|
|
|
|
|
+ if (dfsFileStamp != lcacheStatus.mtime) {
|
|
|
+ return false;
|
|
|
+ }
|
|
|
return true;
|
|
|
}
|
|
|
|
|
|
+ String getKey(URI cache, Configuration conf, long timeStamp)
|
|
|
+ throws IOException {
|
|
|
+ return makeRelative(cache, conf) + String.valueOf(timeStamp);
|
|
|
+ }
|
|
|
+
|
|
|
/**
|
|
|
* This method create symlinks for all files in a given dir in another
|
|
|
* directory.
|
|
@@ -419,9 +433,6 @@ public class TrackerDistributedCacheManager {
|
|
|
}
|
|
|
|
|
|
private static class CacheStatus {
|
|
|
- // false, not loaded yet, true is loaded
|
|
|
- boolean currentStatus;
|
|
|
-
|
|
|
// the local load path of this cache
|
|
|
Path localLoadPath;
|
|
|
|
|
@@ -437,15 +448,31 @@ public class TrackerDistributedCacheManager {
|
|
|
// the cache-file modification time
|
|
|
long mtime;
|
|
|
|
|
|
+ // is it initialized ?
|
|
|
+ boolean inited = false;
|
|
|
+
|
|
|
public CacheStatus(Path baseDir, Path localLoadPath) {
|
|
|
super();
|
|
|
- this.currentStatus = false;
|
|
|
this.localLoadPath = localLoadPath;
|
|
|
this.refcount = 0;
|
|
|
this.mtime = -1;
|
|
|
this.baseDir = baseDir;
|
|
|
this.size = 0;
|
|
|
}
|
|
|
+
|
|
|
+ Path getBaseDir(){
|
|
|
+ return this.baseDir;
|
|
|
+ }
|
|
|
+
|
|
|
+ // mark it as initialized
|
|
|
+ void initComplete() {
|
|
|
+ inited = true;
|
|
|
+ }
|
|
|
+
|
|
|
+ // is it initialized?
|
|
|
+ boolean isInited() {
|
|
|
+ return inited;
|
|
|
+ }
|
|
|
}
|
|
|
|
|
|
/**
|