|
@@ -32,8 +32,12 @@ import javax.security.auth.login.LoginException;
|
|
|
|
|
|
import junit.framework.TestCase;
|
|
import junit.framework.TestCase;
|
|
|
|
|
|
|
|
+import org.apache.commons.logging.Log;
|
|
|
|
+import org.apache.commons.logging.LogFactory;
|
|
import org.apache.hadoop.conf.Configuration;
|
|
import org.apache.hadoop.conf.Configuration;
|
|
import org.apache.hadoop.mapred.DefaultTaskController;
|
|
import org.apache.hadoop.mapred.DefaultTaskController;
|
|
|
|
+import org.apache.hadoop.mapred.JobID;
|
|
|
|
+import org.apache.hadoop.mapred.JobLocalizer;
|
|
import org.apache.hadoop.mapred.TaskController;
|
|
import org.apache.hadoop.mapred.TaskController;
|
|
import org.apache.hadoop.mapred.TaskTracker;
|
|
import org.apache.hadoop.mapred.TaskTracker;
|
|
import org.apache.hadoop.mapreduce.Cluster;
|
|
import org.apache.hadoop.mapreduce.Cluster;
|
|
@@ -41,6 +45,9 @@ import org.apache.hadoop.mapreduce.Job;
|
|
import org.apache.hadoop.mapreduce.MRConfig;
|
|
import org.apache.hadoop.mapreduce.MRConfig;
|
|
import org.apache.hadoop.mapreduce.MRJobConfig;
|
|
import org.apache.hadoop.mapreduce.MRJobConfig;
|
|
import org.apache.hadoop.mapreduce.filecache.DistributedCache;
|
|
import org.apache.hadoop.mapreduce.filecache.DistributedCache;
|
|
|
|
+import org.apache.hadoop.mapreduce.filecache.TaskDistributedCacheManager.CacheFile;
|
|
|
|
+import org.apache.hadoop.mapreduce.filecache.TrackerDistributedCacheManager.CacheStatus;
|
|
|
|
+import org.apache.hadoop.mapreduce.filecache.TestTrackerDistributedCacheManager;
|
|
import org.apache.hadoop.fs.FileStatus;
|
|
import org.apache.hadoop.fs.FileStatus;
|
|
import org.apache.hadoop.fs.FileSystem;
|
|
import org.apache.hadoop.fs.FileSystem;
|
|
import org.apache.hadoop.fs.FileUtil;
|
|
import org.apache.hadoop.fs.FileUtil;
|
|
@@ -54,10 +61,10 @@ import org.apache.hadoop.mapreduce.filecache.TrackerDistributedCacheManager;
|
|
import org.apache.hadoop.mapreduce.server.tasktracker.TTConfig;
|
|
import org.apache.hadoop.mapreduce.server.tasktracker.TTConfig;
|
|
import org.apache.hadoop.security.UserGroupInformation;
|
|
import org.apache.hadoop.security.UserGroupInformation;
|
|
import org.apache.hadoop.util.ReflectionUtils;
|
|
import org.apache.hadoop.util.ReflectionUtils;
|
|
-import org.mortbay.log.Log;
|
|
|
|
|
|
|
|
public class TestTrackerDistributedCacheManager extends TestCase {
|
|
public class TestTrackerDistributedCacheManager extends TestCase {
|
|
-
|
|
|
|
|
|
+ private static final Log LOG =
|
|
|
|
+ LogFactory.getLog(TestTrackerDistributedCacheManager.class);
|
|
protected String TEST_ROOT_DIR =
|
|
protected String TEST_ROOT_DIR =
|
|
new File(System.getProperty("test.build.data", "/tmp"),
|
|
new File(System.getProperty("test.build.data", "/tmp"),
|
|
TestTrackerDistributedCacheManager.class.getSimpleName())
|
|
TestTrackerDistributedCacheManager.class.getSimpleName())
|
|
@@ -68,10 +75,12 @@ public class TestTrackerDistributedCacheManager extends TestCase {
|
|
|
|
|
|
private static final int TEST_FILE_SIZE = 4 * 1024; // 4K
|
|
private static final int TEST_FILE_SIZE = 4 * 1024; // 4K
|
|
private static final int LOCAL_CACHE_LIMIT = 5 * 1024; //5K
|
|
private static final int LOCAL_CACHE_LIMIT = 5 * 1024; //5K
|
|
- private static final int LOCAL_CACHE_SUBDIR = 2;
|
|
|
|
|
|
+ private static final int LOCAL_CACHE_SUBDIR = 1;
|
|
protected Configuration conf;
|
|
protected Configuration conf;
|
|
protected Path firstCacheFile;
|
|
protected Path firstCacheFile;
|
|
|
|
+ protected Path firstCacheFilePublic;
|
|
protected Path secondCacheFile;
|
|
protected Path secondCacheFile;
|
|
|
|
+ protected Path secondCacheFilePublic;
|
|
private FileSystem fs;
|
|
private FileSystem fs;
|
|
|
|
|
|
protected LocalDirAllocator localDirAllocator =
|
|
protected LocalDirAllocator localDirAllocator =
|
|
@@ -119,18 +128,22 @@ public class TestTrackerDistributedCacheManager extends TestCase {
|
|
taskControllerClass, conf);
|
|
taskControllerClass, conf);
|
|
|
|
|
|
// setup permissions for mapred local dir
|
|
// setup permissions for mapred local dir
|
|
- taskController.setup();
|
|
|
|
|
|
+ taskController.setup(localDirAllocator);
|
|
|
|
|
|
// Create the temporary cache files to be used in the tests.
|
|
// Create the temporary cache files to be used in the tests.
|
|
firstCacheFile = new Path(TEST_ROOT_DIR, "firstcachefile");
|
|
firstCacheFile = new Path(TEST_ROOT_DIR, "firstcachefile");
|
|
secondCacheFile = new Path(TEST_ROOT_DIR, "secondcachefile");
|
|
secondCacheFile = new Path(TEST_ROOT_DIR, "secondcachefile");
|
|
|
|
+ firstCacheFilePublic = new Path(TEST_ROOT_DIR, "firstcachefileOne");
|
|
|
|
+ secondCacheFilePublic = new Path(TEST_ROOT_DIR, "secondcachefileOne");
|
|
|
|
+ createPublicTempFile(firstCacheFilePublic);
|
|
|
|
+ createPublicTempFile(secondCacheFilePublic);
|
|
createPrivateTempFile(firstCacheFile);
|
|
createPrivateTempFile(firstCacheFile);
|
|
createPrivateTempFile(secondCacheFile);
|
|
createPrivateTempFile(secondCacheFile);
|
|
}
|
|
}
|
|
|
|
|
|
protected void refreshConf(Configuration conf) throws IOException {
|
|
protected void refreshConf(Configuration conf) throws IOException {
|
|
taskController.setConf(conf);
|
|
taskController.setConf(conf);
|
|
- taskController.setup();
|
|
|
|
|
|
+ taskController.setup(localDirAllocator);
|
|
}
|
|
}
|
|
|
|
|
|
/**
|
|
/**
|
|
@@ -148,7 +161,8 @@ public class TestTrackerDistributedCacheManager extends TestCase {
|
|
* @throws IOException
|
|
* @throws IOException
|
|
* @throws LoginException
|
|
* @throws LoginException
|
|
*/
|
|
*/
|
|
- public void testManagerFlow() throws IOException, LoginException {
|
|
|
|
|
|
+ public void testManagerFlow()
|
|
|
|
+ throws IOException, LoginException, InterruptedException {
|
|
if (!canRun()) {
|
|
if (!canRun()) {
|
|
return;
|
|
return;
|
|
}
|
|
}
|
|
@@ -158,6 +172,7 @@ public class TestTrackerDistributedCacheManager extends TestCase {
|
|
Configuration subConf = new Configuration(conf);
|
|
Configuration subConf = new Configuration(conf);
|
|
String userName = getJobOwnerName();
|
|
String userName = getJobOwnerName();
|
|
subConf.set(MRJobConfig.USER_NAME, userName);
|
|
subConf.set(MRJobConfig.USER_NAME, userName);
|
|
|
|
+ JobID jobid = new JobID("jt",1);
|
|
DistributedCache.addCacheFile(firstCacheFile.toUri(), subConf);
|
|
DistributedCache.addCacheFile(firstCacheFile.toUri(), subConf);
|
|
DistributedCache.addFileToClassPath(secondCacheFile, subConf);
|
|
DistributedCache.addFileToClassPath(secondCacheFile, subConf);
|
|
TrackerDistributedCacheManager.determineTimestamps(subConf);
|
|
TrackerDistributedCacheManager.determineTimestamps(subConf);
|
|
@@ -171,15 +186,18 @@ public class TestTrackerDistributedCacheManager extends TestCase {
|
|
|
|
|
|
// ****** Imitate TaskRunner code.
|
|
// ****** Imitate TaskRunner code.
|
|
TrackerDistributedCacheManager manager =
|
|
TrackerDistributedCacheManager manager =
|
|
- new TrackerDistributedCacheManager(conf, taskController);
|
|
|
|
|
|
+ new TrackerDistributedCacheManager(conf);
|
|
TaskDistributedCacheManager handle =
|
|
TaskDistributedCacheManager handle =
|
|
- manager.newTaskDistributedCacheManager(subConf);
|
|
|
|
|
|
+ manager.newTaskDistributedCacheManager(jobid, subConf);
|
|
assertNull(null, DistributedCache.getLocalCacheFiles(subConf));
|
|
assertNull(null, DistributedCache.getLocalCacheFiles(subConf));
|
|
File workDir = new File(new Path(TEST_ROOT_DIR, "workdir").toString());
|
|
File workDir = new File(new Path(TEST_ROOT_DIR, "workdir").toString());
|
|
- handle.setup(localDirAllocator, workDir, TaskTracker
|
|
|
|
- .getPrivateDistributedCacheDir(userName),
|
|
|
|
- TaskTracker.getPublicDistributedCacheDir());
|
|
|
|
- // ****** End of imitating TaskRunner code
|
|
|
|
|
|
+ handle.setupCache(subConf, TaskTracker.getPublicDistributedCacheDir(),
|
|
|
|
+ TaskTracker.getPrivateDistributedCacheDir(userName));
|
|
|
|
+ JobLocalizer.downloadPrivateCache(subConf);
|
|
|
|
+ // DOESN'T ACTUALLY HAPPEN IN THE TaskRunner (THIS IS A TODO)
|
|
|
|
+// handle.setupPrivateCache(localDirAllocator, TaskTracker
|
|
|
|
+// .getPrivateDistributedCacheDir(userName));
|
|
|
|
+// // ****** End of imitating TaskRunner code
|
|
|
|
|
|
Path[] localCacheFiles = DistributedCache.getLocalCacheFiles(subConf);
|
|
Path[] localCacheFiles = DistributedCache.getLocalCacheFiles(subConf);
|
|
assertNotNull(null, localCacheFiles);
|
|
assertNotNull(null, localCacheFiles);
|
|
@@ -208,18 +226,21 @@ public class TestTrackerDistributedCacheManager extends TestCase {
|
|
TrackerDistributedCacheManager {
|
|
TrackerDistributedCacheManager {
|
|
public FakeTrackerDistributedCacheManager(Configuration conf)
|
|
public FakeTrackerDistributedCacheManager(Configuration conf)
|
|
throws IOException {
|
|
throws IOException {
|
|
- super(conf, taskController);
|
|
|
|
|
|
+ super(conf);
|
|
}
|
|
}
|
|
|
|
|
|
@Override
|
|
@Override
|
|
- Path localizeCache(Configuration conf, URI cache, long confFileStamp,
|
|
|
|
- CacheStatus cacheStatus, boolean isArchive, boolean isPublic)
|
|
|
|
- throws IOException {
|
|
|
|
- if (cache.equals(firstCacheFile.toUri())) {
|
|
|
|
|
|
+ Path localizePublicCacheObject(Configuration conf, URI cache,
|
|
|
|
+ long confFileStamp,
|
|
|
|
+ CacheStatus cacheStatus,
|
|
|
|
+ FileStatus fileStatus,
|
|
|
|
+ boolean isArchive) throws IOException, InterruptedException {
|
|
|
|
+ if (cache.equals(firstCacheFilePublic.toUri())) {
|
|
throw new IOException("fake fail");
|
|
throw new IOException("fake fail");
|
|
}
|
|
}
|
|
- return super.localizeCache(conf, cache, confFileStamp, cacheStatus,
|
|
|
|
- isArchive, isPublic);
|
|
|
|
|
|
+ return super.localizePublicCacheObject(conf, cache, confFileStamp,
|
|
|
|
+ cacheStatus, fileStatus,
|
|
|
|
+ isArchive);
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
|
|
@@ -230,57 +251,58 @@ public class TestTrackerDistributedCacheManager extends TestCase {
|
|
}
|
|
}
|
|
TrackerDistributedCacheManager manager =
|
|
TrackerDistributedCacheManager manager =
|
|
new FakeTrackerDistributedCacheManager(conf);
|
|
new FakeTrackerDistributedCacheManager(conf);
|
|
- Cluster cluster = new Cluster(conf);
|
|
|
|
|
|
+
|
|
String userName = getJobOwnerName();
|
|
String userName = getJobOwnerName();
|
|
File workDir = new File(new Path(TEST_ROOT_DIR, "workdir").toString());
|
|
File workDir = new File(new Path(TEST_ROOT_DIR, "workdir").toString());
|
|
|
|
|
|
// Configures a job with a regular file
|
|
// Configures a job with a regular file
|
|
- Job job1 = Job.getInstance(cluster, conf);
|
|
|
|
- job1.setUser(userName);
|
|
|
|
- job1.addCacheFile(secondCacheFile.toUri());
|
|
|
|
|
|
+ Job job1 = new Job(conf);
|
|
Configuration conf1 = job1.getConfiguration();
|
|
Configuration conf1 = job1.getConfiguration();
|
|
|
|
+ conf1.set("user.name", userName);
|
|
|
|
+ DistributedCache.addCacheFile(secondCacheFile.toUri(), conf1);
|
|
|
|
+
|
|
TrackerDistributedCacheManager.determineTimestamps(conf1);
|
|
TrackerDistributedCacheManager.determineTimestamps(conf1);
|
|
TrackerDistributedCacheManager.determineCacheVisibilities(conf1);
|
|
TrackerDistributedCacheManager.determineCacheVisibilities(conf1);
|
|
|
|
|
|
// Task localizing for first job
|
|
// Task localizing for first job
|
|
TaskDistributedCacheManager handle = manager
|
|
TaskDistributedCacheManager handle = manager
|
|
- .newTaskDistributedCacheManager(conf1);
|
|
|
|
- handle.setup(localDirAllocator, workDir, TaskTracker
|
|
|
|
- .getPrivateDistributedCacheDir(userName),
|
|
|
|
- TaskTracker.getPublicDistributedCacheDir());
|
|
|
|
|
|
+ .newTaskDistributedCacheManager(new JobID("jt", 1), conf1);
|
|
|
|
+ handle.setupCache(conf1, TaskTracker.getPublicDistributedCacheDir(),
|
|
|
|
+ TaskTracker.getPrivateDistributedCacheDir(userName));
|
|
|
|
+ JobLocalizer.downloadPrivateCache(conf1);
|
|
handle.release();
|
|
handle.release();
|
|
for (TaskDistributedCacheManager.CacheFile c : handle.getCacheFiles()) {
|
|
for (TaskDistributedCacheManager.CacheFile c : handle.getCacheFiles()) {
|
|
- assertEquals(0, manager.getReferenceCount(c.uri, conf1, c.timestamp,
|
|
|
|
- c.owner));
|
|
|
|
|
|
+ assertEquals(0, manager.getReferenceCount(c.getStatus()));
|
|
}
|
|
}
|
|
|
|
|
|
Path thirdCacheFile = new Path(TEST_ROOT_DIR, "thirdcachefile");
|
|
Path thirdCacheFile = new Path(TEST_ROOT_DIR, "thirdcachefile");
|
|
createPrivateTempFile(thirdCacheFile);
|
|
createPrivateTempFile(thirdCacheFile);
|
|
|
|
|
|
// Configures another job with three regular files.
|
|
// Configures another job with three regular files.
|
|
- Job job2 = Job.getInstance(cluster, conf);
|
|
|
|
- job2.setUser(userName);
|
|
|
|
|
|
+ Job job2 = new Job(conf);
|
|
|
|
+ Configuration conf2 = job2.getConfiguration();
|
|
|
|
+ conf2.set("user.name", userName);
|
|
// add a file that would get failed to localize
|
|
// add a file that would get failed to localize
|
|
- job2.addCacheFile(firstCacheFile.toUri());
|
|
|
|
|
|
+ DistributedCache.addCacheFile(firstCacheFilePublic.toUri(), conf2);
|
|
// add a file that is already localized by different job
|
|
// add a file that is already localized by different job
|
|
- job2.addCacheFile(secondCacheFile.toUri());
|
|
|
|
|
|
+ DistributedCache.addCacheFile(secondCacheFile.toUri(), conf2);
|
|
// add a file that is never localized
|
|
// add a file that is never localized
|
|
- job2.addCacheFile(thirdCacheFile.toUri());
|
|
|
|
- Configuration conf2 = job2.getConfiguration();
|
|
|
|
|
|
+ DistributedCache.addCacheFile(thirdCacheFile.toUri(), conf2);
|
|
|
|
+
|
|
TrackerDistributedCacheManager.determineTimestamps(conf2);
|
|
TrackerDistributedCacheManager.determineTimestamps(conf2);
|
|
TrackerDistributedCacheManager.determineCacheVisibilities(conf2);
|
|
TrackerDistributedCacheManager.determineCacheVisibilities(conf2);
|
|
|
|
|
|
// Task localizing for second job
|
|
// Task localizing for second job
|
|
// localization for the "firstCacheFile" will fail.
|
|
// localization for the "firstCacheFile" will fail.
|
|
- handle = manager.newTaskDistributedCacheManager(conf2);
|
|
|
|
|
|
+ handle = manager.newTaskDistributedCacheManager(new JobID("jt", 2), conf2);
|
|
Throwable th = null;
|
|
Throwable th = null;
|
|
try {
|
|
try {
|
|
- handle.setup(localDirAllocator, workDir, TaskTracker
|
|
|
|
- .getPrivateDistributedCacheDir(userName),
|
|
|
|
- TaskTracker.getPublicDistributedCacheDir());
|
|
|
|
|
|
+ handle.setupCache(conf2, TaskTracker.getPublicDistributedCacheDir(),
|
|
|
|
+ TaskTracker.getPrivateDistributedCacheDir(userName));
|
|
|
|
+ JobLocalizer.downloadPrivateCache(conf2);
|
|
} catch (IOException e) {
|
|
} catch (IOException e) {
|
|
th = e;
|
|
th = e;
|
|
- Log.info("Exception during setup", e);
|
|
|
|
|
|
+ LOG.info("Exception during setup", e);
|
|
}
|
|
}
|
|
assertNotNull(th);
|
|
assertNotNull(th);
|
|
assertTrue(th.getMessage().contains("fake fail"));
|
|
assertTrue(th.getMessage().contains("fake fail"));
|
|
@@ -288,15 +310,15 @@ public class TestTrackerDistributedCacheManager extends TestCase {
|
|
th = null;
|
|
th = null;
|
|
for (TaskDistributedCacheManager.CacheFile c : handle.getCacheFiles()) {
|
|
for (TaskDistributedCacheManager.CacheFile c : handle.getCacheFiles()) {
|
|
try {
|
|
try {
|
|
- assertEquals(0, manager.getReferenceCount(c.uri, conf2, c.timestamp,
|
|
|
|
- c.owner));
|
|
|
|
- } catch (IOException ie) {
|
|
|
|
|
|
+ int refcount = manager.getReferenceCount(c.getStatus());
|
|
|
|
+ LOG.info("checking refcount " + c.uri + " of " + refcount);
|
|
|
|
+ assertEquals(0, refcount);
|
|
|
|
+ } catch (NullPointerException ie) {
|
|
th = ie;
|
|
th = ie;
|
|
- Log.info("Exception getting reference count for " + c.uri, ie);
|
|
|
|
|
|
+ LOG.info("Exception getting reference count for " + c.uri, ie);
|
|
}
|
|
}
|
|
}
|
|
}
|
|
assertNotNull(th);
|
|
assertNotNull(th);
|
|
- assertTrue(th.getMessage().contains(thirdCacheFile.getName()));
|
|
|
|
fs.delete(thirdCacheFile, false);
|
|
fs.delete(thirdCacheFile, false);
|
|
}
|
|
}
|
|
|
|
|
|
@@ -361,7 +383,7 @@ public class TestTrackerDistributedCacheManager extends TestCase {
|
|
private Path checkLocalizedPath(boolean visibility)
|
|
private Path checkLocalizedPath(boolean visibility)
|
|
throws IOException, LoginException, InterruptedException {
|
|
throws IOException, LoginException, InterruptedException {
|
|
TrackerDistributedCacheManager manager =
|
|
TrackerDistributedCacheManager manager =
|
|
- new TrackerDistributedCacheManager(conf, taskController);
|
|
|
|
|
|
+ new TrackerDistributedCacheManager(conf);
|
|
Cluster cluster = new Cluster(conf);
|
|
Cluster cluster = new Cluster(conf);
|
|
String userName = getJobOwnerName();
|
|
String userName = getJobOwnerName();
|
|
File workDir = new File(TEST_ROOT_DIR, "workdir");
|
|
File workDir = new File(TEST_ROOT_DIR, "workdir");
|
|
@@ -381,10 +403,10 @@ public class TestTrackerDistributedCacheManager extends TestCase {
|
|
|
|
|
|
// Task localizing for job
|
|
// Task localizing for job
|
|
TaskDistributedCacheManager handle = manager
|
|
TaskDistributedCacheManager handle = manager
|
|
- .newTaskDistributedCacheManager(conf1);
|
|
|
|
- handle.setup(localDirAllocator, workDir, TaskTracker
|
|
|
|
- .getPrivateDistributedCacheDir(userName),
|
|
|
|
- TaskTracker.getPublicDistributedCacheDir());
|
|
|
|
|
|
+ .newTaskDistributedCacheManager(new JobID("jt", 1), conf1);
|
|
|
|
+ handle.setupCache(conf1, TaskTracker.getPublicDistributedCacheDir(),
|
|
|
|
+ TaskTracker.getPrivateDistributedCacheDir(userName));
|
|
|
|
+ JobLocalizer.downloadPrivateCache(conf1);
|
|
TaskDistributedCacheManager.CacheFile c = handle.getCacheFiles().get(0);
|
|
TaskDistributedCacheManager.CacheFile c = handle.getCacheFiles().get(0);
|
|
String distCacheDir;
|
|
String distCacheDir;
|
|
if (visibility) {
|
|
if (visibility) {
|
|
@@ -394,9 +416,8 @@ public class TestTrackerDistributedCacheManager extends TestCase {
|
|
}
|
|
}
|
|
Path localizedPath =
|
|
Path localizedPath =
|
|
manager.getLocalCache(cacheFile.toUri(), conf1, distCacheDir,
|
|
manager.getLocalCache(cacheFile.toUri(), conf1, distCacheDir,
|
|
- fs.getFileStatus(cacheFile), false,
|
|
|
|
- c.timestamp, new Path(TEST_ROOT_DIR), false,
|
|
|
|
- visibility);
|
|
|
|
|
|
+ fs.getFileStatus(cacheFile), false,
|
|
|
|
+ c.timestamp, visibility, c);
|
|
assertTrue("Cache file didn't get localized in the expected directory. " +
|
|
assertTrue("Cache file didn't get localized in the expected directory. " +
|
|
"Expected localization to happen within " +
|
|
"Expected localization to happen within " +
|
|
ROOT_MAPRED_LOCAL_DIR + "/" + distCacheDir +
|
|
ROOT_MAPRED_LOCAL_DIR + "/" + distCacheDir +
|
|
@@ -504,56 +525,94 @@ public class TestTrackerDistributedCacheManager extends TestCase {
|
|
Configuration conf2 = new Configuration(conf);
|
|
Configuration conf2 = new Configuration(conf);
|
|
conf2.set(MRConfig.LOCAL_DIR, ROOT_MAPRED_LOCAL_DIR.toString());
|
|
conf2.set(MRConfig.LOCAL_DIR, ROOT_MAPRED_LOCAL_DIR.toString());
|
|
conf2.setLong(TTConfig.TT_LOCAL_CACHE_SIZE, LOCAL_CACHE_LIMIT);
|
|
conf2.setLong(TTConfig.TT_LOCAL_CACHE_SIZE, LOCAL_CACHE_LIMIT);
|
|
- conf2.setLong(TTConfig.TT_LOCAL_CACHE_SUBDIRS_LIMIT, LOCAL_CACHE_SUBDIR);
|
|
|
|
conf2.setLong(TTConfig.TT_DISTRIBUTED_CACHE_CHECK_PERIOD, 200); // 200 ms
|
|
conf2.setLong(TTConfig.TT_DISTRIBUTED_CACHE_CHECK_PERIOD, 200); // 200 ms
|
|
refreshConf(conf2);
|
|
refreshConf(conf2);
|
|
TrackerDistributedCacheManager manager =
|
|
TrackerDistributedCacheManager manager =
|
|
- new TrackerDistributedCacheManager(conf2, taskController);
|
|
|
|
|
|
+ new TrackerDistributedCacheManager(conf2);
|
|
manager.startCleanupThread();
|
|
manager.startCleanupThread();
|
|
FileSystem localfs = FileSystem.getLocal(conf2);
|
|
FileSystem localfs = FileSystem.getLocal(conf2);
|
|
String userName = getJobOwnerName();
|
|
String userName = getJobOwnerName();
|
|
conf2.set(MRJobConfig.USER_NAME, userName);
|
|
conf2.set(MRJobConfig.USER_NAME, userName);
|
|
|
|
|
|
// We first test the size limit
|
|
// We first test the size limit
|
|
- Path localCache = manager.getLocalCache(firstCacheFile.toUri(), conf2,
|
|
|
|
|
|
+ FileStatus stat = fs.getFileStatus(firstCacheFilePublic);
|
|
|
|
+ CacheFile cfile1 = new CacheFile(firstCacheFilePublic.toUri(),
|
|
|
|
+ CacheFile.FileType.REGULAR, true,
|
|
|
|
+ stat.getModificationTime(),
|
|
|
|
+ true);
|
|
|
|
+ Path firstLocalCache = manager.getLocalCache(firstCacheFilePublic.toUri(), conf2,
|
|
TaskTracker.getPrivateDistributedCacheDir(userName),
|
|
TaskTracker.getPrivateDistributedCacheDir(userName),
|
|
- fs.getFileStatus(firstCacheFile), false,
|
|
|
|
- getFileStamp(firstCacheFile), new Path(TEST_ROOT_DIR), false, false);
|
|
|
|
- manager.releaseCache(firstCacheFile.toUri(), conf2,
|
|
|
|
- getFileStamp(firstCacheFile),
|
|
|
|
- TrackerDistributedCacheManager.getLocalizedCacheOwner(false));
|
|
|
|
|
|
+ fs.getFileStatus(firstCacheFilePublic), false,
|
|
|
|
+ fs.getFileStatus(firstCacheFilePublic).getModificationTime(), true,
|
|
|
|
+ cfile1);
|
|
|
|
+ manager.releaseCache(cfile1.getStatus());
|
|
//in above code,localized a file of size 4K and then release the cache
|
|
//in above code,localized a file of size 4K and then release the cache
|
|
// which will cause the cache be deleted when the limit goes out.
|
|
// which will cause the cache be deleted when the limit goes out.
|
|
// The below code localize another cache which's designed to
|
|
// The below code localize another cache which's designed to
|
|
//sweep away the first cache.
|
|
//sweep away the first cache.
|
|
- manager.getLocalCache(secondCacheFile.toUri(), conf2,
|
|
|
|
|
|
+ stat = fs.getFileStatus(secondCacheFilePublic);
|
|
|
|
+ CacheFile cfile2 = new CacheFile(secondCacheFilePublic.toUri(),
|
|
|
|
+ CacheFile.FileType.REGULAR, true,
|
|
|
|
+ stat.getModificationTime(),
|
|
|
|
+ true);
|
|
|
|
+ assertTrue("DistributedCache currently doesn't have cached file",
|
|
|
|
+ localfs.exists(firstLocalCache));
|
|
|
|
+ Path secondLocalCache = manager.getLocalCache(secondCacheFilePublic.toUri(), conf2,
|
|
TaskTracker.getPrivateDistributedCacheDir(userName),
|
|
TaskTracker.getPrivateDistributedCacheDir(userName),
|
|
- fs.getFileStatus(secondCacheFile), false,
|
|
|
|
- getFileStamp(secondCacheFile), new Path(TEST_ROOT_DIR), false, false);
|
|
|
|
- checkCacheDeletion(localfs, localCache, "DistributedCache failed " +
|
|
|
|
|
|
+ fs.getFileStatus(secondCacheFilePublic), false,
|
|
|
|
+ fs.getFileStatus(secondCacheFilePublic).getModificationTime(), true,
|
|
|
|
+ cfile2);
|
|
|
|
+ checkCacheDeletion(localfs, firstLocalCache, "DistributedCache failed " +
|
|
"deleting old cache when the cache store is full.");
|
|
"deleting old cache when the cache store is full.");
|
|
|
|
+ manager.stopCleanupThread();
|
|
// Now we test the number of sub directories limit
|
|
// Now we test the number of sub directories limit
|
|
|
|
+ conf2.setLong(TTConfig.TT_LOCAL_CACHE_SUBDIRS_LIMIT, LOCAL_CACHE_SUBDIR);
|
|
|
|
+ conf2.setLong(TTConfig.TT_LOCAL_CACHE_SIZE, LOCAL_CACHE_LIMIT * 10);
|
|
|
|
+ conf2.setLong(TTConfig.TT_DISTRIBUTED_CACHE_CHECK_PERIOD, 200); // 200 ms
|
|
|
|
+ manager =
|
|
|
|
+ new TrackerDistributedCacheManager(conf2);
|
|
|
|
+ manager.startCleanupThread();
|
|
// Create the temporary cache files to be used in the tests.
|
|
// Create the temporary cache files to be used in the tests.
|
|
Path thirdCacheFile = new Path(TEST_ROOT_DIR, "thirdcachefile");
|
|
Path thirdCacheFile = new Path(TEST_ROOT_DIR, "thirdcachefile");
|
|
Path fourthCacheFile = new Path(TEST_ROOT_DIR, "fourthcachefile");
|
|
Path fourthCacheFile = new Path(TEST_ROOT_DIR, "fourthcachefile");
|
|
// Adding two more small files, so it triggers the number of sub directory
|
|
// Adding two more small files, so it triggers the number of sub directory
|
|
// limit but does not trigger the file size limit.
|
|
// limit but does not trigger the file size limit.
|
|
- createTempFile(thirdCacheFile, 1);
|
|
|
|
- createTempFile(fourthCacheFile, 1);
|
|
|
|
|
|
+ createPrivateTempFile(thirdCacheFile);
|
|
|
|
+ createPrivateTempFile(fourthCacheFile);
|
|
|
|
+ DistributedCache.setCacheFiles(new URI[]{thirdCacheFile.toUri()}, conf2);
|
|
|
|
+ TrackerDistributedCacheManager.determineCacheVisibilities(conf2);
|
|
|
|
+ TrackerDistributedCacheManager.determineTimestamps(conf2);
|
|
|
|
+ stat = fs.getFileStatus(thirdCacheFile);
|
|
|
|
+ CacheFile cfile3 = new CacheFile(thirdCacheFile.toUri(),
|
|
|
|
+ CacheFile.FileType.REGULAR, false,
|
|
|
|
+ stat.getModificationTime(),
|
|
|
|
+ true);
|
|
Path thirdLocalCache = manager.getLocalCache(thirdCacheFile.toUri(), conf2,
|
|
Path thirdLocalCache = manager.getLocalCache(thirdCacheFile.toUri(), conf2,
|
|
TaskTracker.getPrivateDistributedCacheDir(userName),
|
|
TaskTracker.getPrivateDistributedCacheDir(userName),
|
|
fs.getFileStatus(thirdCacheFile), false,
|
|
fs.getFileStatus(thirdCacheFile), false,
|
|
- getFileStamp(thirdCacheFile), new Path(TEST_ROOT_DIR), false, false);
|
|
|
|
|
|
+ fs.getFileStatus(thirdCacheFile).getModificationTime(),
|
|
|
|
+ false, cfile3);
|
|
|
|
+ DistributedCache.setLocalFiles(conf2, thirdLocalCache.toString());
|
|
|
|
+ JobLocalizer.downloadPrivateCache(conf2);
|
|
// Release the third cache so that it can be deleted while sweeping
|
|
// Release the third cache so that it can be deleted while sweeping
|
|
- manager.releaseCache(thirdCacheFile.toUri(), conf2,
|
|
|
|
- getFileStamp(thirdCacheFile),
|
|
|
|
- TrackerDistributedCacheManager.getLocalizedCacheOwner(false));
|
|
|
|
|
|
+ manager.releaseCache(cfile3.getStatus());
|
|
// Getting the fourth cache will make the number of sub directories becomes
|
|
// Getting the fourth cache will make the number of sub directories becomes
|
|
// 3 which is greater than 2. So the released cache will be deleted.
|
|
// 3 which is greater than 2. So the released cache will be deleted.
|
|
- manager.getLocalCache(fourthCacheFile.toUri(), conf2,
|
|
|
|
|
|
+ stat = fs.getFileStatus(fourthCacheFile);
|
|
|
|
+ CacheFile cfile4 = new CacheFile(fourthCacheFile.toUri(),
|
|
|
|
+ CacheFile.FileType.REGULAR, false,
|
|
|
|
+ stat.getModificationTime(),
|
|
|
|
+ true);
|
|
|
|
+ assertTrue("DistributedCache currently doesn't have cached file",
|
|
|
|
+ localfs.exists(thirdLocalCache));
|
|
|
|
+ DistributedCache.setCacheFiles(new URI[]{fourthCacheFile.toUri()}, conf2);
|
|
|
|
+ DistributedCache.setLocalFiles(conf2, thirdCacheFile.toUri().toString());
|
|
|
|
+ TrackerDistributedCacheManager.determineCacheVisibilities(conf2);
|
|
|
|
+ TrackerDistributedCacheManager.determineTimestamps(conf2);
|
|
|
|
+ Path fourthLocalCache = manager.getLocalCache(fourthCacheFile.toUri(), conf2,
|
|
TaskTracker.getPrivateDistributedCacheDir(userName),
|
|
TaskTracker.getPrivateDistributedCacheDir(userName),
|
|
fs.getFileStatus(fourthCacheFile), false,
|
|
fs.getFileStatus(fourthCacheFile), false,
|
|
- getFileStamp(fourthCacheFile), new Path(TEST_ROOT_DIR), false, false);
|
|
|
|
|
|
+ fs.getFileStatus(fourthCacheFile).getModificationTime(), false, cfile4);
|
|
checkCacheDeletion(localfs, thirdLocalCache,
|
|
checkCacheDeletion(localfs, thirdLocalCache,
|
|
"DistributedCache failed deleting old" +
|
|
"DistributedCache failed deleting old" +
|
|
" cache when the cache exceeds the number of sub directories limit.");
|
|
" cache when the cache exceeds the number of sub directories limit.");
|
|
@@ -587,17 +646,20 @@ public class TestTrackerDistributedCacheManager extends TestCase {
|
|
return;
|
|
return;
|
|
}
|
|
}
|
|
TrackerDistributedCacheManager manager =
|
|
TrackerDistributedCacheManager manager =
|
|
- new TrackerDistributedCacheManager(conf, taskController);
|
|
|
|
|
|
+ new TrackerDistributedCacheManager(conf);
|
|
conf.set("fs.fakefile.impl", conf.get("fs.file.impl"));
|
|
conf.set("fs.fakefile.impl", conf.get("fs.file.impl"));
|
|
String userName = getJobOwnerName();
|
|
String userName = getJobOwnerName();
|
|
conf.set(MRJobConfig.USER_NAME, userName);
|
|
conf.set(MRJobConfig.USER_NAME, userName);
|
|
Path fileToCache = new Path("fakefile:///"
|
|
Path fileToCache = new Path("fakefile:///"
|
|
+ firstCacheFile.toUri().getPath());
|
|
+ firstCacheFile.toUri().getPath());
|
|
|
|
+ CacheFile file = new CacheFile(fileToCache.toUri(),
|
|
|
|
+ CacheFile.FileType.REGULAR,
|
|
|
|
+ false, 0, false);
|
|
Path result = manager.getLocalCache(fileToCache.toUri(), conf,
|
|
Path result = manager.getLocalCache(fileToCache.toUri(), conf,
|
|
TaskTracker.getPrivateDistributedCacheDir(userName),
|
|
TaskTracker.getPrivateDistributedCacheDir(userName),
|
|
fs.getFileStatus(firstCacheFile), false,
|
|
fs.getFileStatus(firstCacheFile), false,
|
|
- getFileStamp(firstCacheFile),
|
|
|
|
- new Path(TEST_ROOT_DIR), false, false);
|
|
|
|
|
|
+ System.currentTimeMillis(),
|
|
|
|
+ false, file);
|
|
assertNotNull("DistributedCache cached file on non-default filesystem.",
|
|
assertNotNull("DistributedCache cached file on non-default filesystem.",
|
|
result);
|
|
result);
|
|
}
|
|
}
|
|
@@ -632,6 +694,8 @@ public class TestTrackerDistributedCacheManager extends TestCase {
|
|
protected void tearDown() throws IOException {
|
|
protected void tearDown() throws IOException {
|
|
new File(firstCacheFile.toString()).delete();
|
|
new File(firstCacheFile.toString()).delete();
|
|
new File(secondCacheFile.toString()).delete();
|
|
new File(secondCacheFile.toString()).delete();
|
|
|
|
+ new File(firstCacheFilePublic.toString()).delete();
|
|
|
|
+ new File(secondCacheFilePublic.toString()).delete();
|
|
FileUtil.fullyDelete(new File(TEST_ROOT_DIR));
|
|
FileUtil.fullyDelete(new File(TEST_ROOT_DIR));
|
|
}
|
|
}
|
|
|
|
|
|
@@ -652,9 +716,13 @@ public class TestTrackerDistributedCacheManager extends TestCase {
|
|
}
|
|
}
|
|
|
|
|
|
public FileStatus getFileStatus(Path p) throws IOException {
|
|
public FileStatus getFileStatus(Path p) throws IOException {
|
|
|
|
+ FileStatus rawFileStatus = super.getFileStatus(p);
|
|
File f = pathToFile(p);
|
|
File f = pathToFile(p);
|
|
- return new FileStatus(f.length(), f.isDirectory(), 1, 128,
|
|
|
|
- f.lastModified() + increment, makeQualified(new Path(f.getPath())));
|
|
|
|
|
|
+ FileStatus status = new FileStatus(f.length(), f.isDirectory(), 1, 128,
|
|
|
|
+ f.lastModified() + increment, 0,
|
|
|
|
+ rawFileStatus.getPermission(), rawFileStatus.getOwner(),
|
|
|
|
+ rawFileStatus.getGroup(), makeQualified(new Path(f.getPath())));
|
|
|
|
+ return status;
|
|
}
|
|
}
|
|
|
|
|
|
void advanceClock(long millis) {
|
|
void advanceClock(long millis) {
|
|
@@ -672,7 +740,7 @@ public class TestTrackerDistributedCacheManager extends TestCase {
|
|
String userName = getJobOwnerName();
|
|
String userName = getJobOwnerName();
|
|
|
|
|
|
TrackerDistributedCacheManager manager =
|
|
TrackerDistributedCacheManager manager =
|
|
- new TrackerDistributedCacheManager(myConf, taskController);
|
|
|
|
|
|
+ new TrackerDistributedCacheManager(myConf);
|
|
// ****** Imitate JobClient code
|
|
// ****** Imitate JobClient code
|
|
// Configures a task/job with both a regular file and a "classpath" file.
|
|
// Configures a task/job with both a regular file and a "classpath" file.
|
|
Configuration subConf = new Configuration(myConf);
|
|
Configuration subConf = new Configuration(myConf);
|
|
@@ -684,14 +752,16 @@ public class TestTrackerDistributedCacheManager extends TestCase {
|
|
|
|
|
|
// ****** Imitate TaskRunner code.
|
|
// ****** Imitate TaskRunner code.
|
|
TaskDistributedCacheManager handle =
|
|
TaskDistributedCacheManager handle =
|
|
- manager.newTaskDistributedCacheManager(subConf);
|
|
|
|
|
|
+ manager.newTaskDistributedCacheManager(new JobID("jt", 1), subConf);
|
|
assertNull(null, DistributedCache.getLocalCacheFiles(subConf));
|
|
assertNull(null, DistributedCache.getLocalCacheFiles(subConf));
|
|
File workDir = new File(new Path(TEST_ROOT_DIR, "workdir").toString());
|
|
File workDir = new File(new Path(TEST_ROOT_DIR, "workdir").toString());
|
|
- handle.setup(localDirAllocator, workDir, TaskTracker
|
|
|
|
- .getPrivateDistributedCacheDir(userName),
|
|
|
|
- TaskTracker.getPublicDistributedCacheDir());
|
|
|
|
|
|
+ handle.setupCache(subConf, TaskTracker.getPublicDistributedCacheDir(),
|
|
|
|
+ TaskTracker.getPrivateDistributedCacheDir(userName));
|
|
|
|
+ //TODO this doesn't really happen in the TaskRunner
|
|
|
|
+// handle.setupPrivateCache(localDirAllocator, TaskTracker
|
|
|
|
+// .getPrivateDistributedCacheDir(userName));
|
|
// ****** End of imitating TaskRunner code
|
|
// ****** End of imitating TaskRunner code
|
|
-
|
|
|
|
|
|
+ JobLocalizer.downloadPrivateCache(subConf);
|
|
Path[] localCacheFiles = DistributedCache.getLocalCacheFiles(subConf);
|
|
Path[] localCacheFiles = DistributedCache.getLocalCacheFiles(subConf);
|
|
assertNotNull(null, localCacheFiles);
|
|
assertNotNull(null, localCacheFiles);
|
|
assertEquals(1, localCacheFiles.length);
|
|
assertEquals(1, localCacheFiles.length);
|
|
@@ -709,8 +779,10 @@ public class TestTrackerDistributedCacheManager extends TestCase {
|
|
// running a task of the same job
|
|
// running a task of the same job
|
|
Throwable th = null;
|
|
Throwable th = null;
|
|
try {
|
|
try {
|
|
- handle.setup(localDirAllocator, workDir, TaskTracker
|
|
|
|
- .getPrivateDistributedCacheDir(userName), TaskTracker.getPublicDistributedCacheDir());
|
|
|
|
|
|
+ handle.setupCache(subConf, TaskTracker.getPublicDistributedCacheDir(),
|
|
|
|
+ TaskTracker.getPrivateDistributedCacheDir(userName));
|
|
|
|
+// handle.setupPrivateCache(localDirAllocator, TaskTracker
|
|
|
|
+// .getPrivateDistributedCacheDir(userName));
|
|
} catch (IOException ie) {
|
|
} catch (IOException ie) {
|
|
th = ie;
|
|
th = ie;
|
|
}
|
|
}
|
|
@@ -723,21 +795,22 @@ public class TestTrackerDistributedCacheManager extends TestCase {
|
|
// running a task of the same job on another TaskTracker which has never
|
|
// running a task of the same job on another TaskTracker which has never
|
|
// initialized the cache
|
|
// initialized the cache
|
|
TrackerDistributedCacheManager manager2 =
|
|
TrackerDistributedCacheManager manager2 =
|
|
- new TrackerDistributedCacheManager(myConf, taskController);
|
|
|
|
|
|
+ new TrackerDistributedCacheManager(myConf);
|
|
TaskDistributedCacheManager handle2 =
|
|
TaskDistributedCacheManager handle2 =
|
|
- manager2.newTaskDistributedCacheManager(subConf);
|
|
|
|
|
|
+ manager2.newTaskDistributedCacheManager(new JobID("jt", 1), subConf);
|
|
File workDir2 = new File(new Path(TEST_ROOT_DIR, "workdir2").toString());
|
|
File workDir2 = new File(new Path(TEST_ROOT_DIR, "workdir2").toString());
|
|
th = null;
|
|
th = null;
|
|
try {
|
|
try {
|
|
- handle2.setup(localDirAllocator, workDir2, TaskTracker
|
|
|
|
|
|
+ handle2.setupCache(subConf, TaskTracker
|
|
.getPrivateDistributedCacheDir(userName),
|
|
.getPrivateDistributedCacheDir(userName),
|
|
TaskTracker.getPublicDistributedCacheDir());
|
|
TaskTracker.getPublicDistributedCacheDir());
|
|
|
|
+ JobLocalizer.downloadPrivateCache(subConf);
|
|
} catch (IOException ie) {
|
|
} catch (IOException ie) {
|
|
th = ie;
|
|
th = ie;
|
|
}
|
|
}
|
|
assertNotNull("Throwable is null", th);
|
|
assertNotNull("Throwable is null", th);
|
|
assertTrue("Exception message does not match",
|
|
assertTrue("Exception message does not match",
|
|
- th.getMessage().contains("has changed on HDFS since job started"));
|
|
|
|
|
|
+ th.getMessage().contains("changed during the job from"));
|
|
// release
|
|
// release
|
|
handle.release();
|
|
handle.release();
|
|
|
|
|
|
@@ -749,9 +822,10 @@ public class TestTrackerDistributedCacheManager extends TestCase {
|
|
TrackerDistributedCacheManager.determineCacheVisibilities(subConf2);
|
|
TrackerDistributedCacheManager.determineCacheVisibilities(subConf2);
|
|
|
|
|
|
handle =
|
|
handle =
|
|
- manager.newTaskDistributedCacheManager(subConf2);
|
|
|
|
- handle.setup(localDirAllocator, workDir, TaskTracker
|
|
|
|
- .getPrivateDistributedCacheDir(userName), TaskTracker.getPublicDistributedCacheDir());
|
|
|
|
|
|
+ manager.newTaskDistributedCacheManager(new JobID("jt", 2), subConf2);
|
|
|
|
+ handle.setupCache(subConf2, TaskTracker.getPublicDistributedCacheDir(),
|
|
|
|
+ TaskTracker.getPrivateDistributedCacheDir(userName));
|
|
|
|
+ JobLocalizer.downloadPrivateCache(subConf2);
|
|
Path[] localCacheFiles2 = DistributedCache.getLocalCacheFiles(subConf2);
|
|
Path[] localCacheFiles2 = DistributedCache.getLocalCacheFiles(subConf2);
|
|
assertNotNull(null, localCacheFiles2);
|
|
assertNotNull(null, localCacheFiles2);
|
|
assertEquals(1, localCacheFiles2.length);
|
|
assertEquals(1, localCacheFiles2.length);
|
|
@@ -781,26 +855,36 @@ public class TestTrackerDistributedCacheManager extends TestCase {
|
|
String userName = getJobOwnerName();
|
|
String userName = getJobOwnerName();
|
|
conf.set(MRJobConfig.USER_NAME, userName);
|
|
conf.set(MRJobConfig.USER_NAME, userName);
|
|
TrackerDistributedCacheManager manager =
|
|
TrackerDistributedCacheManager manager =
|
|
- new TrackerDistributedCacheManager(conf, taskController);
|
|
|
|
|
|
+ new TrackerDistributedCacheManager(conf);
|
|
FileSystem localfs = FileSystem.getLocal(conf);
|
|
FileSystem localfs = FileSystem.getLocal(conf);
|
|
|
|
+ long now = System.currentTimeMillis();
|
|
|
|
|
|
Path[] localCache = new Path[2];
|
|
Path[] localCache = new Path[2];
|
|
- localCache[0] = manager.getLocalCache(firstCacheFile.toUri(), conf,
|
|
|
|
|
|
+ FileStatus stat = fs.getFileStatus(firstCacheFile);
|
|
|
|
+ CacheFile file = new CacheFile(firstCacheFilePublic.toUri(),
|
|
|
|
+ CacheFile.FileType.REGULAR, true,
|
|
|
|
+ stat.getModificationTime(), false);
|
|
|
|
+ localCache[0] = manager.getLocalCache(firstCacheFilePublic.toUri(), conf,
|
|
TaskTracker.getPrivateDistributedCacheDir(userName),
|
|
TaskTracker.getPrivateDistributedCacheDir(userName),
|
|
- fs.getFileStatus(firstCacheFile), false,
|
|
|
|
- getFileStamp(firstCacheFile), new Path(TEST_ROOT_DIR), false, false);
|
|
|
|
|
|
+ fs.getFileStatus(firstCacheFilePublic), false,
|
|
|
|
+ fs.getFileStatus(firstCacheFilePublic).getModificationTime(), true,
|
|
|
|
+ file);
|
|
FsPermission myPermission = new FsPermission((short)0600);
|
|
FsPermission myPermission = new FsPermission((short)0600);
|
|
Path myFile = new Path(localCache[0].getParent(), "myfile.txt");
|
|
Path myFile = new Path(localCache[0].getParent(), "myfile.txt");
|
|
if (FileSystem.create(localfs, myFile, myPermission) == null) {
|
|
if (FileSystem.create(localfs, myFile, myPermission) == null) {
|
|
throw new IOException("Could not create " + myFile);
|
|
throw new IOException("Could not create " + myFile);
|
|
}
|
|
}
|
|
try {
|
|
try {
|
|
- localCache[1] = manager.getLocalCache(secondCacheFile.toUri(), conf,
|
|
|
|
|
|
+ stat = fs.getFileStatus(secondCacheFilePublic);
|
|
|
|
+ file = new CacheFile(secondCacheFilePublic.toUri(),
|
|
|
|
+ CacheFile.FileType.REGULAR,
|
|
|
|
+ true, stat.getModificationTime(), false);
|
|
|
|
+ localCache[1] = manager.getLocalCache(secondCacheFilePublic.toUri(), conf,
|
|
TaskTracker.getPrivateDistributedCacheDir(userName),
|
|
TaskTracker.getPrivateDistributedCacheDir(userName),
|
|
- fs.getFileStatus(secondCacheFile), false,
|
|
|
|
- getFileStamp(secondCacheFile), new Path(TEST_ROOT_DIR), false,
|
|
|
|
- false);
|
|
|
|
- FileStatus stat = localfs.getFileStatus(myFile);
|
|
|
|
|
|
+ fs.getFileStatus(secondCacheFilePublic), false,
|
|
|
|
+ fs.getFileStatus(secondCacheFilePublic).getModificationTime(), true,
|
|
|
|
+ file);
|
|
|
|
+ stat = localfs.getFileStatus(myFile);
|
|
assertTrue(stat.getPermission().equals(myPermission));
|
|
assertTrue(stat.getPermission().equals(myPermission));
|
|
// validate permissions of localized files.
|
|
// validate permissions of localized files.
|
|
checkFilePermissions(localCache);
|
|
checkFilePermissions(localCache);
|