|
@@ -33,6 +33,7 @@ import junit.framework.TestCase;
|
|
|
import org.apache.commons.logging.Log;
|
|
|
import org.apache.commons.logging.LogFactory;
|
|
|
import org.apache.hadoop.conf.Configuration;
|
|
|
+import org.apache.hadoop.filecache.TaskDistributedCacheManager.CacheFile;
|
|
|
import org.apache.hadoop.mapred.DefaultTaskController;
|
|
|
import org.apache.hadoop.mapred.JobID;
|
|
|
import org.apache.hadoop.mapred.JobLocalizer;
|
|
@@ -255,8 +256,7 @@ public class TestTrackerDistributedCacheManager extends TestCase {
|
|
|
JobLocalizer.downloadPrivateCache(conf1);
|
|
|
handle.release();
|
|
|
for (TaskDistributedCacheManager.CacheFile c : handle.getCacheFiles()) {
|
|
|
- assertEquals(0, manager.getReferenceCount(c.uri, conf1, c.timestamp,
|
|
|
- c.owner));
|
|
|
+ assertEquals(0, manager.getReferenceCount(c.getStatus()));
|
|
|
}
|
|
|
|
|
|
Path thirdCacheFile = new Path(TEST_ROOT_DIR, "thirdcachefile");
|
|
@@ -294,17 +294,15 @@ public class TestTrackerDistributedCacheManager extends TestCase {
|
|
|
th = null;
|
|
|
for (TaskDistributedCacheManager.CacheFile c : handle.getCacheFiles()) {
|
|
|
try {
|
|
|
- int refcount = manager.getReferenceCount(c.uri, conf2, c.timestamp,
|
|
|
- c.owner);
|
|
|
+ int refcount = manager.getReferenceCount(c.getStatus());
|
|
|
LOG.info("checking refcount " + c.uri + " of " + refcount);
|
|
|
assertEquals(0, refcount);
|
|
|
- } catch (IOException ie) {
|
|
|
+ } catch (NullPointerException ie) {
|
|
|
th = ie;
|
|
|
LOG.info("Exception getting reference count for " + c.uri, ie);
|
|
|
}
|
|
|
}
|
|
|
assertNotNull(th);
|
|
|
- assertTrue(th.getMessage().contains(thirdCacheFile.getName()));
|
|
|
fs.delete(thirdCacheFile, false);
|
|
|
}
|
|
|
|
|
@@ -417,8 +415,8 @@ public class TestTrackerDistributedCacheManager extends TestCase {
|
|
|
}
|
|
|
Path localizedPath =
|
|
|
manager.getLocalCache(cacheFile.toUri(), conf1, distCacheDir,
|
|
|
- fs.getFileStatus(cacheFile), false,
|
|
|
- c.timestamp, visibility);
|
|
|
+ fs.getFileStatus(cacheFile), false,
|
|
|
+ c.timestamp, visibility, c);
|
|
|
assertTrue("Cache file didn't get localized in the expected directory. " +
|
|
|
"Expected localization to happen within " +
|
|
|
ROOT_MAPRED_LOCAL_DIR + "/" + distCacheDir +
|
|
@@ -530,21 +528,31 @@ public class TestTrackerDistributedCacheManager extends TestCase {
|
|
|
conf2.set("user.name", userName);
|
|
|
|
|
|
// We first test the size limit
|
|
|
+ FileStatus stat = fs.getFileStatus(firstCacheFilePublic);
|
|
|
+ CacheFile cfile1 = new CacheFile(firstCacheFilePublic.toUri(),
|
|
|
+ CacheFile.FileType.REGULAR, true,
|
|
|
+ stat.getModificationTime(),
|
|
|
+ true);
|
|
|
Path firstLocalCache = manager.getLocalCache(firstCacheFilePublic.toUri(), conf2,
|
|
|
TaskTracker.getPrivateDistributedCacheDir(userName),
|
|
|
fs.getFileStatus(firstCacheFilePublic), false,
|
|
|
- fs.getFileStatus(firstCacheFilePublic).getModificationTime(), true);
|
|
|
- manager.releaseCache(firstCacheFilePublic.toUri(), conf2,
|
|
|
- fs.getFileStatus(firstCacheFilePublic).getModificationTime(),
|
|
|
- TrackerDistributedCacheManager.getLocalizedCacheOwner(false));
|
|
|
+ fs.getFileStatus(firstCacheFilePublic).getModificationTime(), true,
|
|
|
+ cfile1);
|
|
|
+ manager.releaseCache(cfile1.getStatus());
|
|
|
//in above code,localized a file of size 4K and then release the cache
|
|
|
// which will cause the cache be deleted when the limit goes out.
|
|
|
// The below code localize another cache which's designed to
|
|
|
//sweep away the first cache.
|
|
|
+ stat = fs.getFileStatus(secondCacheFilePublic);
|
|
|
+ CacheFile cfile2 = new CacheFile(secondCacheFilePublic.toUri(),
|
|
|
+ CacheFile.FileType.REGULAR, true,
|
|
|
+ stat.getModificationTime(),
|
|
|
+ true);
|
|
|
Path secondLocalCache = manager.getLocalCache(secondCacheFilePublic.toUri(), conf2,
|
|
|
TaskTracker.getPrivateDistributedCacheDir(userName),
|
|
|
fs.getFileStatus(secondCacheFilePublic), false,
|
|
|
- fs.getFileStatus(secondCacheFilePublic).getModificationTime(), true);
|
|
|
+ fs.getFileStatus(secondCacheFilePublic).getModificationTime(), true,
|
|
|
+ cfile2);
|
|
|
assertFalse("DistributedCache failed deleting old" +
|
|
|
" cache when the cache store is full.",
|
|
|
localfs.exists(firstLocalCache));
|
|
@@ -583,21 +591,29 @@ public class TestTrackerDistributedCacheManager extends TestCase {
|
|
|
// limit but does not trigger the file size limit.
|
|
|
createPublicTempFile(thirdCacheFile);
|
|
|
createPublicTempFile(fourthCacheFile);
|
|
|
+ stat = fs.getFileStatus(thirdCacheFile);
|
|
|
+ CacheFile cfile3 = new CacheFile(thirdCacheFile.toUri(),
|
|
|
+ CacheFile.FileType.REGULAR, false,
|
|
|
+ stat.getModificationTime(),
|
|
|
+ true);
|
|
|
Path thirdLocalCache = manager.getLocalCache(thirdCacheFile.toUri(), conf2,
|
|
|
TaskTracker.getPrivateDistributedCacheDir(userName),
|
|
|
fs.getFileStatus(thirdCacheFile), false,
|
|
|
fs.getFileStatus(thirdCacheFile).getModificationTime(),
|
|
|
- true);
|
|
|
+ true, cfile3);
|
|
|
// Release the third cache so that it can be deleted while sweeping
|
|
|
- manager.releaseCache(thirdCacheFile.toUri(), conf2,
|
|
|
- fs.getFileStatus(thirdCacheFile).getModificationTime(),
|
|
|
- TrackerDistributedCacheManager.getLocalizedCacheOwner(false));
|
|
|
+ manager.releaseCache(cfile3.getStatus());
|
|
|
// Getting the fourth cache will make the number of sub directories becomes
|
|
|
// 3 which is greater than 2. So the released cache will be deleted.
|
|
|
+ stat = fs.getFileStatus(fourthCacheFile);
|
|
|
+ CacheFile cfile4 = new CacheFile(fourthCacheFile.toUri(),
|
|
|
+ CacheFile.FileType.REGULAR, false,
|
|
|
+ stat.getModificationTime(),
|
|
|
+ true);
|
|
|
Path fourthLocalCache = manager.getLocalCache(fourthCacheFile.toUri(), conf2,
|
|
|
TaskTracker.getPrivateDistributedCacheDir(userName),
|
|
|
fs.getFileStatus(fourthCacheFile), false,
|
|
|
- fs.getFileStatus(fourthCacheFile).getModificationTime(), true);
|
|
|
+ fs.getFileStatus(fourthCacheFile).getModificationTime(), true, cfile4);
|
|
|
assertFalse("DistributedCache failed deleting old" +
|
|
|
" cache when the cache exceeds the number of sub directories limit.",
|
|
|
localfs.exists(thirdLocalCache));
|
|
@@ -624,11 +640,14 @@ public class TestTrackerDistributedCacheManager extends TestCase {
|
|
|
conf.set("user.name", userName);
|
|
|
Path fileToCache = new Path("fakefile:///"
|
|
|
+ firstCacheFile.toUri().getPath());
|
|
|
+ CacheFile file = new CacheFile(fileToCache.toUri(),
|
|
|
+ CacheFile.FileType.REGULAR,
|
|
|
+ false, 0, false);
|
|
|
Path result = manager.getLocalCache(fileToCache.toUri(), conf,
|
|
|
TaskTracker.getPrivateDistributedCacheDir(userName),
|
|
|
fs.getFileStatus(firstCacheFile), false,
|
|
|
System.currentTimeMillis(),
|
|
|
- false);
|
|
|
+ false, file);
|
|
|
assertNotNull("DistributedCache cached file on non-default filesystem.",
|
|
|
result);
|
|
|
}
|
|
@@ -802,21 +821,31 @@ public class TestTrackerDistributedCacheManager extends TestCase {
|
|
|
long now = System.currentTimeMillis();
|
|
|
|
|
|
Path[] localCache = new Path[2];
|
|
|
+ FileStatus stat = fs.getFileStatus(firstCacheFile);
|
|
|
+ CacheFile file = new CacheFile(firstCacheFilePublic.toUri(),
|
|
|
+ CacheFile.FileType.REGULAR, true,
|
|
|
+ stat.getModificationTime(), false);
|
|
|
localCache[0] = manager.getLocalCache(firstCacheFilePublic.toUri(), conf,
|
|
|
TaskTracker.getPrivateDistributedCacheDir(userName),
|
|
|
fs.getFileStatus(firstCacheFilePublic), false,
|
|
|
- fs.getFileStatus(firstCacheFilePublic).getModificationTime(), true);
|
|
|
+ fs.getFileStatus(firstCacheFilePublic).getModificationTime(), true,
|
|
|
+ file);
|
|
|
FsPermission myPermission = new FsPermission((short)0600);
|
|
|
Path myFile = new Path(localCache[0].getParent(), "myfile.txt");
|
|
|
if (FileSystem.create(localfs, myFile, myPermission) == null) {
|
|
|
throw new IOException("Could not create " + myFile);
|
|
|
}
|
|
|
try {
|
|
|
+ stat = fs.getFileStatus(secondCacheFilePublic);
|
|
|
+ file = new CacheFile(secondCacheFilePublic.toUri(),
|
|
|
+ CacheFile.FileType.REGULAR,
|
|
|
+ true, stat.getModificationTime(), false);
|
|
|
localCache[1] = manager.getLocalCache(secondCacheFilePublic.toUri(), conf,
|
|
|
TaskTracker.getPrivateDistributedCacheDir(userName),
|
|
|
fs.getFileStatus(secondCacheFilePublic), false,
|
|
|
- fs.getFileStatus(secondCacheFilePublic).getModificationTime(), true);
|
|
|
- FileStatus stat = localfs.getFileStatus(myFile);
|
|
|
+ fs.getFileStatus(secondCacheFilePublic).getModificationTime(), true,
|
|
|
+ file);
|
|
|
+ stat = localfs.getFileStatus(myFile);
|
|
|
assertTrue(stat.getPermission().equals(myPermission));
|
|
|
// validate permissions of localized files.
|
|
|
checkFilePermissions(localCache);
|