|
@@ -75,7 +75,7 @@ public class TestTrackerDistributedCacheManager extends TestCase {
|
|
|
new LocalDirAllocator(JobConf.MAPRED_LOCAL_DIR_PROPERTY);
|
|
|
|
|
|
@Override
|
|
|
- protected void setUp() throws IOException {
|
|
|
+ protected void setUp() throws IOException,InterruptedException {
|
|
|
|
|
|
// Prepare the tests' root dir
|
|
|
File TEST_ROOT = new File(TEST_ROOT_DIR);
|
|
@@ -97,8 +97,8 @@ public class TestTrackerDistributedCacheManager extends TestCase {
|
|
|
// Create the temporary cache files to be used in the tests.
|
|
|
firstCacheFile = new Path(TEST_ROOT_DIR, "firstcachefile");
|
|
|
secondCacheFile = new Path(TEST_ROOT_DIR, "secondcachefile");
|
|
|
- createTempFile(firstCacheFile);
|
|
|
- createTempFile(secondCacheFile);
|
|
|
+ createPrivateTempFile(firstCacheFile);
|
|
|
+ createPrivateTempFile(secondCacheFile);
|
|
|
}
|
|
|
|
|
|
/**
|
|
@@ -127,6 +127,7 @@ public class TestTrackerDistributedCacheManager extends TestCase {
|
|
|
DistributedCache.addCacheFile(firstCacheFile.toUri(), subConf);
|
|
|
DistributedCache.addFileToClassPath(secondCacheFile, subConf);
|
|
|
TrackerDistributedCacheManager.determineTimestamps(subConf);
|
|
|
+ TrackerDistributedCacheManager.determineCacheVisibilities(subConf);
|
|
|
// ****** End of imitating JobClient code
|
|
|
|
|
|
Path jobFile = new Path(TEST_ROOT_DIR, "job.xml");
|
|
@@ -144,7 +145,8 @@ public class TestTrackerDistributedCacheManager extends TestCase {
|
|
|
assertNull(null, DistributedCache.getLocalCacheFiles(subConf));
|
|
|
File workDir = new File(new Path(TEST_ROOT_DIR, "workdir").toString());
|
|
|
handle.setup(localDirAllocator, workDir, TaskTracker
|
|
|
- .getDistributedCacheDir(userName));
|
|
|
+ .getPrivateDistributedCacheDir(userName),
|
|
|
+ TaskTracker.getPublicDistributedCacheDir());
|
|
|
|
|
|
InitializationContext context = new InitializationContext();
|
|
|
context.user = userName;
|
|
@@ -195,7 +197,7 @@ public class TestTrackerDistributedCacheManager extends TestCase {
|
|
|
}
|
|
|
|
|
|
public void testReferenceCount() throws IOException, LoginException,
|
|
|
- URISyntaxException {
|
|
|
+ URISyntaxException, InterruptedException {
|
|
|
if (!canRun()) {
|
|
|
return;
|
|
|
}
|
|
@@ -213,19 +215,21 @@ public class TestTrackerDistributedCacheManager extends TestCase {
|
|
|
DistributedCache.addCacheFile(secondCacheFile.toUri(), conf1);
|
|
|
|
|
|
TrackerDistributedCacheManager.determineTimestamps(conf1);
|
|
|
+ TrackerDistributedCacheManager.determineCacheVisibilities(conf1);
|
|
|
|
|
|
// Task localizing for first job
|
|
|
TaskDistributedCacheManager handle = manager
|
|
|
.newTaskDistributedCacheManager(conf1);
|
|
|
handle.setup(localDirAllocator, workDir, TaskTracker
|
|
|
- .getDistributedCacheDir(userName));
|
|
|
+ .getPrivateDistributedCacheDir(userName),
|
|
|
+ TaskTracker.getPublicDistributedCacheDir());
|
|
|
handle.release();
|
|
|
for (TaskDistributedCacheManager.CacheFile c : handle.getCacheFiles()) {
|
|
|
assertEquals(0, manager.getReferenceCount(c.uri, conf1, c.timestamp));
|
|
|
}
|
|
|
|
|
|
Path thirdCacheFile = new Path(TEST_ROOT_DIR, "thirdcachefile");
|
|
|
- createTempFile(thirdCacheFile);
|
|
|
+ createPrivateTempFile(thirdCacheFile);
|
|
|
|
|
|
// Configures another job with three regular files.
|
|
|
Job job2 = new Job(conf);
|
|
@@ -238,6 +242,7 @@ public class TestTrackerDistributedCacheManager extends TestCase {
|
|
|
DistributedCache.addCacheFile(thirdCacheFile.toUri(), conf2);
|
|
|
|
|
|
TrackerDistributedCacheManager.determineTimestamps(conf2);
|
|
|
+ TrackerDistributedCacheManager.determineCacheVisibilities(conf2);
|
|
|
|
|
|
// Task localizing for second job
|
|
|
// localization for the "firstCacheFile" will fail.
|
|
@@ -245,7 +250,8 @@ public class TestTrackerDistributedCacheManager extends TestCase {
|
|
|
Throwable th = null;
|
|
|
try {
|
|
|
handle.setup(localDirAllocator, workDir, TaskTracker
|
|
|
- .getDistributedCacheDir(userName));
|
|
|
+ .getPrivateDistributedCacheDir(userName),
|
|
|
+ TaskTracker.getPublicDistributedCacheDir());
|
|
|
} catch (IOException e) {
|
|
|
th = e;
|
|
|
LOG.info("Exception during setup", e);
|
|
@@ -266,7 +272,64 @@ public class TestTrackerDistributedCacheManager extends TestCase {
|
|
|
assertTrue(th.getMessage().contains(thirdCacheFile.getName()));
|
|
|
fs.delete(thirdCacheFile, false);
|
|
|
}
|
|
|
+
|
|
|
+ /**
|
|
|
+ * Tests that localization of distributed cache file happens in the desired
|
|
|
+ * directory
|
|
|
+ * @throws IOException
|
|
|
+ * @throws LoginException
|
|
|
+ */
|
|
|
+ public void testPublicPrivateCache()
|
|
|
+ throws IOException, LoginException, InterruptedException {
|
|
|
+ if (!canRun()) {
|
|
|
+ return;
|
|
|
+ }
|
|
|
+ checkLocalizedPath("true");
|
|
|
+ checkLocalizedPath("false");
|
|
|
+ }
|
|
|
+
|
|
|
+ private void checkLocalizedPath(String visibility)
|
|
|
+ throws IOException, LoginException, InterruptedException {
|
|
|
+ TrackerDistributedCacheManager manager =
|
|
|
+ new TrackerDistributedCacheManager(conf);
|
|
|
+ String userName = getJobOwnerName();
|
|
|
+ File workDir = new File(TEST_ROOT_DIR, "workdir");
|
|
|
+ Path cacheFile = new Path(TEST_ROOT_DIR, "fourthcachefile");
|
|
|
+ if ("true".equals(visibility)) {
|
|
|
+ createPublicTempFile(cacheFile);
|
|
|
+ } else {
|
|
|
+ createPrivateTempFile(cacheFile);
|
|
|
+ }
|
|
|
+
|
|
|
+ Configuration conf1 = new Configuration(conf);
|
|
|
+ DistributedCache.addCacheFile(cacheFile.toUri(), conf1);
|
|
|
+ TrackerDistributedCacheManager.determineTimestamps(conf1);
|
|
|
+ TrackerDistributedCacheManager.determineCacheVisibilities(conf1);
|
|
|
|
|
|
+ // Task localizing for job
|
|
|
+ TaskDistributedCacheManager handle = manager
|
|
|
+ .newTaskDistributedCacheManager(conf1);
|
|
|
+ handle.setup(localDirAllocator, workDir, TaskTracker
|
|
|
+ .getPrivateDistributedCacheDir(userName),
|
|
|
+ TaskTracker.getPublicDistributedCacheDir());
|
|
|
+ TaskDistributedCacheManager.CacheFile c = handle.getCacheFiles().get(0);
|
|
|
+ String distCacheDir;
|
|
|
+ if ("true".equals(visibility)) {
|
|
|
+ distCacheDir = TaskTracker.getPublicDistributedCacheDir();
|
|
|
+ } else {
|
|
|
+ distCacheDir = TaskTracker.getPrivateDistributedCacheDir(userName);
|
|
|
+ }
|
|
|
+ Path localizedPath =
|
|
|
+ manager.getLocalCache(cacheFile.toUri(), conf1, distCacheDir,
|
|
|
+ fs.getFileStatus(cacheFile), false,
|
|
|
+ c.timestamp, new Path(TEST_ROOT_DIR), false);
|
|
|
+ assertTrue("Cache file didn't get localized in the expected directory. " +
|
|
|
+ "Expected localization to happen within " +
|
|
|
+ ROOT_MAPRED_LOCAL_DIR + "/" + distCacheDir +
|
|
|
+ ", but was localized at " +
|
|
|
+ localizedPath, localizedPath.toString().contains(distCacheDir));
|
|
|
+ }
|
|
|
+
|
|
|
/**
|
|
|
* Check proper permissions on the cache files
|
|
|
*
|
|
@@ -347,6 +410,18 @@ public class TestTrackerDistributedCacheManager extends TestCase {
|
|
|
os.close();
|
|
|
FileSystem.LOG.info("created: " + p + ", size=" + TEST_FILE_SIZE);
|
|
|
}
|
|
|
+
|
|
|
+ static void createPublicTempFile(Path p)
|
|
|
+ throws IOException, InterruptedException {
|
|
|
+ createTempFile(p);
|
|
|
+ FileUtil.chmod(p.toString(), "0777",true);
|
|
|
+ }
|
|
|
+
|
|
|
+ static void createPrivateTempFile(Path p)
|
|
|
+ throws IOException, InterruptedException {
|
|
|
+ createTempFile(p);
|
|
|
+ FileUtil.chmod(p.toString(), "0770",true);
|
|
|
+ }
|
|
|
|
|
|
@Override
|
|
|
protected void tearDown() throws IOException {
|
|
@@ -396,6 +471,7 @@ public class TestTrackerDistributedCacheManager extends TestCase {
|
|
|
Configuration subConf = new Configuration(myConf);
|
|
|
DistributedCache.addCacheFile(firstCacheFile.toUri(), subConf);
|
|
|
TrackerDistributedCacheManager.determineTimestamps(subConf);
|
|
|
+ TrackerDistributedCacheManager.determineCacheVisibilities(subConf);
|
|
|
// ****** End of imitating JobClient code
|
|
|
|
|
|
String userName = getJobOwnerName();
|
|
@@ -406,7 +482,8 @@ public class TestTrackerDistributedCacheManager extends TestCase {
|
|
|
assertNull(null, DistributedCache.getLocalCacheFiles(subConf));
|
|
|
File workDir = new File(new Path(TEST_ROOT_DIR, "workdir").toString());
|
|
|
handle.setup(localDirAllocator, workDir, TaskTracker
|
|
|
- .getDistributedCacheDir(userName));
|
|
|
+ .getPrivateDistributedCacheDir(userName),
|
|
|
+ TaskTracker.getPublicDistributedCacheDir());
|
|
|
// ****** End of imitating TaskRunner code
|
|
|
|
|
|
Path[] localCacheFiles = DistributedCache.getLocalCacheFiles(subConf);
|
|
@@ -427,7 +504,7 @@ public class TestTrackerDistributedCacheManager extends TestCase {
|
|
|
Throwable th = null;
|
|
|
try {
|
|
|
handle.setup(localDirAllocator, workDir, TaskTracker
|
|
|
- .getDistributedCacheDir(userName));
|
|
|
+ .getPrivateDistributedCacheDir(userName), TaskTracker.getPublicDistributedCacheDir());
|
|
|
} catch (IOException ie) {
|
|
|
th = ie;
|
|
|
}
|
|
@@ -441,11 +518,12 @@ public class TestTrackerDistributedCacheManager extends TestCase {
|
|
|
Configuration subConf2 = new Configuration(myConf);
|
|
|
DistributedCache.addCacheFile(firstCacheFile.toUri(), subConf2);
|
|
|
TrackerDistributedCacheManager.determineTimestamps(subConf2);
|
|
|
+ TrackerDistributedCacheManager.determineCacheVisibilities(subConf2);
|
|
|
|
|
|
handle =
|
|
|
manager.newTaskDistributedCacheManager(subConf2);
|
|
|
handle.setup(localDirAllocator, workDir, TaskTracker
|
|
|
- .getDistributedCacheDir(userName));
|
|
|
+ .getPrivateDistributedCacheDir(userName), TaskTracker.getPublicDistributedCacheDir());
|
|
|
Path[] localCacheFiles2 = DistributedCache.getLocalCacheFiles(subConf2);
|
|
|
assertNotNull(null, localCacheFiles2);
|
|
|
assertEquals(1, localCacheFiles2.length);
|