|
@@ -24,6 +24,8 @@ import java.io.FileOutputStream;
|
|
|
import java.io.IOException;
|
|
|
import java.net.URI;
|
|
|
import java.net.URISyntaxException;
|
|
|
+import java.util.HashMap;
|
|
|
+import java.util.Map;
|
|
|
import java.util.Random;
|
|
|
import java.util.concurrent.CountDownLatch;
|
|
|
import java.util.concurrent.TimeUnit;
|
|
@@ -108,7 +110,7 @@ public class TestTrackerDistributedCacheManager extends TestCase {
|
|
|
assertTrue("Test root directory " + TEST_ROOT + " and all of its " +
|
|
|
"parent directories must have a+x permissions",
|
|
|
TrackerDistributedCacheManager.ancestorsHaveExecutePermissions(
|
|
|
- fs, new Path(TEST_ROOT.toString())));
|
|
|
+ fs, new Path(TEST_ROOT.toString()), new HashMap<URI, FileStatus>()));
|
|
|
|
|
|
// Prepare the tests' mapred-local-dir
|
|
|
ROOT_MAPRED_LOCAL_DIR = new File(TEST_ROOT_DIR, "mapred/local");
|
|
@@ -187,8 +189,11 @@ public class TestTrackerDistributedCacheManager extends TestCase {
|
|
|
DistributedCache.addCacheFile(firstCacheFile.toUri(), subConf);
|
|
|
DistributedCache.addFileToClassPath(secondCacheFile, subConf,
|
|
|
FileSystem.get(subConf));
|
|
|
- TrackerDistributedCacheManager.determineTimestamps(subConf);
|
|
|
- TrackerDistributedCacheManager.determineCacheVisibilities(subConf);
|
|
|
+
|
|
|
+ Map<URI, FileStatus> statCache = new HashMap<URI, FileStatus>();
|
|
|
+ TrackerDistributedCacheManager.determineTimestamps(subConf, statCache);
|
|
|
+ TrackerDistributedCacheManager.determineCacheVisibilities(subConf, statCache);
|
|
|
+ assertEquals(2, statCache.size());
|
|
|
// ****** End of imitating JobClient code
|
|
|
|
|
|
Path jobFile = new Path(TEST_ROOT_DIR, "job.xml");
|
|
@@ -273,8 +278,7 @@ public class TestTrackerDistributedCacheManager extends TestCase {
|
|
|
conf1.set("user.name", userName);
|
|
|
DistributedCache.addCacheFile(secondCacheFile.toUri(), conf1);
|
|
|
|
|
|
- TrackerDistributedCacheManager.determineTimestamps(conf1);
|
|
|
- TrackerDistributedCacheManager.determineCacheVisibilities(conf1);
|
|
|
+ TrackerDistributedCacheManager.determineTimestampsAndCacheVisibilities(conf1);
|
|
|
|
|
|
// Task localizing for first job
|
|
|
JobID jobId = new JobID("jt", 1);
|
|
@@ -302,8 +306,7 @@ public class TestTrackerDistributedCacheManager extends TestCase {
|
|
|
DistributedCache.addCacheFile(firstCacheDirPublic.toUri(), conf2);
|
|
|
DistributedCache.addCacheFile(firstCacheDirPrivate.toUri(), conf2);
|
|
|
|
|
|
- TrackerDistributedCacheManager.determineTimestamps(conf2);
|
|
|
- TrackerDistributedCacheManager.determineCacheVisibilities(conf2);
|
|
|
+ TrackerDistributedCacheManager.determineTimestampsAndCacheVisibilities(conf2);
|
|
|
|
|
|
// Task localizing for second job
|
|
|
JobID job2Id = new JobID("jt", 2);
|
|
@@ -339,8 +342,7 @@ public class TestTrackerDistributedCacheManager extends TestCase {
|
|
|
// add a file that is never localized
|
|
|
DistributedCache.addCacheFile(thirdCacheFile.toUri(), conf3);
|
|
|
|
|
|
- TrackerDistributedCacheManager.determineTimestamps(conf3);
|
|
|
- TrackerDistributedCacheManager.determineCacheVisibilities(conf3);
|
|
|
+ TrackerDistributedCacheManager.determineTimestampsAndCacheVisibilities(conf3);
|
|
|
|
|
|
// Task localizing for third job
|
|
|
// localization for the "firstCacheFile" will fail.
|
|
@@ -379,7 +381,7 @@ public class TestTrackerDistributedCacheManager extends TestCase {
|
|
|
* @throws LoginException
|
|
|
*/
|
|
|
public void testPublicPrivateCache()
|
|
|
- throws IOException, LoginException, InterruptedException {
|
|
|
+ throws IOException, LoginException, InterruptedException, URISyntaxException {
|
|
|
if (!canRun()) {
|
|
|
return;
|
|
|
}
|
|
@@ -404,8 +406,7 @@ public class TestTrackerDistributedCacheManager extends TestCase {
|
|
|
|
|
|
DistributedCache.addCacheFile(cacheFile.toUri(), conf1);
|
|
|
DistributedCache.addCacheArchive(cacheFile.toUri(), conf1);
|
|
|
- TrackerDistributedCacheManager.determineTimestamps(conf1);
|
|
|
- TrackerDistributedCacheManager.determineCacheVisibilities(conf1);
|
|
|
+ TrackerDistributedCacheManager.determineTimestampsAndCacheVisibilities(conf1);
|
|
|
dumpState(conf1);
|
|
|
|
|
|
TaskDistributedCacheManager handle = manager
|
|
@@ -491,7 +492,7 @@ public class TestTrackerDistributedCacheManager extends TestCase {
|
|
|
}
|
|
|
|
|
|
private void checkLocalizedPath(boolean visibility)
|
|
|
- throws IOException, LoginException, InterruptedException {
|
|
|
+ throws IOException, LoginException, InterruptedException, URISyntaxException {
|
|
|
TrackerDistributedCacheManager manager =
|
|
|
new TrackerDistributedCacheManager(conf, taskController);
|
|
|
String userName = getJobOwnerName();
|
|
@@ -506,8 +507,7 @@ public class TestTrackerDistributedCacheManager extends TestCase {
|
|
|
Configuration conf1 = new Configuration(conf);
|
|
|
conf1.set("user.name", userName);
|
|
|
DistributedCache.addCacheFile(cacheFile.toUri(), conf1);
|
|
|
- TrackerDistributedCacheManager.determineTimestamps(conf1);
|
|
|
- TrackerDistributedCacheManager.determineCacheVisibilities(conf1);
|
|
|
+ TrackerDistributedCacheManager.determineTimestampsAndCacheVisibilities(conf1);
|
|
|
dumpState(conf1);
|
|
|
|
|
|
// Task localizing for job
|
|
@@ -894,8 +894,7 @@ public class TestTrackerDistributedCacheManager extends TestCase {
|
|
|
createPrivateTempFile(thirdCacheFile);
|
|
|
createPrivateTempFile(fourthCacheFile);
|
|
|
DistributedCache.setCacheFiles(new URI[]{thirdCacheFile.toUri()}, conf2);
|
|
|
- TrackerDistributedCacheManager.determineCacheVisibilities(conf2);
|
|
|
- TrackerDistributedCacheManager.determineTimestamps(conf2);
|
|
|
+ TrackerDistributedCacheManager.determineTimestampsAndCacheVisibilities(conf2);
|
|
|
stat = fs.getFileStatus(thirdCacheFile);
|
|
|
CacheFile cfile3 = new CacheFile(thirdCacheFile.toUri(),
|
|
|
CacheFile.FileType.REGULAR, false,
|
|
@@ -922,8 +921,7 @@ public class TestTrackerDistributedCacheManager extends TestCase {
|
|
|
|
|
|
DistributedCache.setCacheFiles(new URI[]{fourthCacheFile.toUri()}, conf2);
|
|
|
DistributedCache.setLocalFiles(conf2, thirdCacheFile.toUri().toString());
|
|
|
- TrackerDistributedCacheManager.determineCacheVisibilities(conf2);
|
|
|
- TrackerDistributedCacheManager.determineTimestamps(conf2);
|
|
|
+ TrackerDistributedCacheManager.determineTimestampsAndCacheVisibilities(conf2);
|
|
|
Path fourthLocalCache = manager.getLocalCache(fourthCacheFile.toUri(), conf2,
|
|
|
TaskTracker.getPrivateDistributedCacheDir(userName),
|
|
|
fs.getFileStatus(fourthCacheFile), false,
|
|
@@ -1100,8 +1098,7 @@ public class TestTrackerDistributedCacheManager extends TestCase {
|
|
|
Configuration subConf = new Configuration(myConf);
|
|
|
subConf.set("user.name", userName);
|
|
|
DistributedCache.addCacheFile(firstCacheFile.toUri(), subConf);
|
|
|
- TrackerDistributedCacheManager.determineTimestamps(subConf);
|
|
|
- TrackerDistributedCacheManager.determineCacheVisibilities(subConf);
|
|
|
+ TrackerDistributedCacheManager.determineTimestampsAndCacheVisibilities(subConf);
|
|
|
// ****** End of imitating JobClient code
|
|
|
|
|
|
// ****** Imitate TaskRunner code.
|
|
@@ -1150,8 +1147,7 @@ public class TestTrackerDistributedCacheManager extends TestCase {
|
|
|
Configuration subConf2 = new Configuration(myConf);
|
|
|
subConf2.set("user.name", userName);
|
|
|
DistributedCache.addCacheFile(firstCacheFile.toUri(), subConf2);
|
|
|
- TrackerDistributedCacheManager.determineTimestamps(subConf2);
|
|
|
- TrackerDistributedCacheManager.determineCacheVisibilities(subConf2);
|
|
|
+ TrackerDistributedCacheManager.determineTimestampsAndCacheVisibilities(subConf2);
|
|
|
|
|
|
handle =
|
|
|
manager.newTaskDistributedCacheManager(new JobID("jt", 2), subConf2);
|