|
@@ -34,10 +34,8 @@ import org.apache.commons.logging.Log;
|
|
|
import org.apache.commons.logging.LogFactory;
|
|
|
import org.apache.hadoop.conf.Configuration;
|
|
|
import org.apache.hadoop.mapred.DefaultTaskController;
|
|
|
-import org.apache.hadoop.mapred.JobConf;
|
|
|
import org.apache.hadoop.mapred.TaskController;
|
|
|
import org.apache.hadoop.mapred.TaskTracker;
|
|
|
-import org.apache.hadoop.mapred.TaskController.InitializationContext;
|
|
|
import org.apache.hadoop.mapreduce.Job;
|
|
|
import org.apache.hadoop.filecache.DistributedCache;
|
|
|
import org.apache.hadoop.fs.FileStatus;
|
|
@@ -46,10 +44,12 @@ import org.apache.hadoop.fs.FileUtil;
|
|
|
import org.apache.hadoop.fs.LocalDirAllocator;
|
|
|
import org.apache.hadoop.fs.Path;
|
|
|
import org.apache.hadoop.fs.RawLocalFileSystem;
|
|
|
+import org.apache.hadoop.fs.permission.FsAction;
|
|
|
+import org.apache.hadoop.fs.permission.FsPermission;
|
|
|
import org.apache.hadoop.filecache.TaskDistributedCacheManager;
|
|
|
import org.apache.hadoop.filecache.TrackerDistributedCacheManager;
|
|
|
import org.apache.hadoop.security.UserGroupInformation;
|
|
|
-
|
|
|
+import org.apache.hadoop.util.ReflectionUtils;
|
|
|
|
|
|
public class TestTrackerDistributedCacheManager extends TestCase {
|
|
|
private static final Log LOG =
|
|
@@ -61,7 +61,6 @@ public class TestTrackerDistributedCacheManager extends TestCase {
|
|
|
.getAbsolutePath();
|
|
|
|
|
|
protected File ROOT_MAPRED_LOCAL_DIR;
|
|
|
- private static String TEST_CACHE_BASE_DIR = "cachebasedir";
|
|
|
protected int numLocalDirs = 6;
|
|
|
|
|
|
private static final int TEST_FILE_SIZE = 4 * 1024; // 4K
|
|
@@ -72,7 +71,8 @@ public class TestTrackerDistributedCacheManager extends TestCase {
|
|
|
private FileSystem fs;
|
|
|
|
|
|
protected LocalDirAllocator localDirAllocator =
|
|
|
- new LocalDirAllocator(JobConf.MAPRED_LOCAL_DIR_PROPERTY);
|
|
|
+ new LocalDirAllocator("mapred.local.dir");
|
|
|
+ protected TaskController taskController;
|
|
|
|
|
|
@Override
|
|
|
protected void setUp() throws IOException,InterruptedException {
|
|
@@ -87,12 +87,25 @@ public class TestTrackerDistributedCacheManager extends TestCase {
|
|
|
ROOT_MAPRED_LOCAL_DIR = new File(TEST_ROOT_DIR, "mapred/local");
|
|
|
ROOT_MAPRED_LOCAL_DIR.mkdirs();
|
|
|
|
|
|
+ String []localDirs = new String[numLocalDirs];
|
|
|
+ for (int i = 0; i < numLocalDirs; i++) {
|
|
|
+ File localDir = new File(ROOT_MAPRED_LOCAL_DIR, "0_" + i);
|
|
|
+ localDirs[i] = localDir.getPath();
|
|
|
+ localDir.mkdir();
|
|
|
+ }
|
|
|
+
|
|
|
conf = new Configuration();
|
|
|
- conf.setLong("local.cache.size", LOCAL_CACHE_LIMIT);
|
|
|
- conf.set(JobConf.MAPRED_LOCAL_DIR_PROPERTY,
|
|
|
- ROOT_MAPRED_LOCAL_DIR.toString());
|
|
|
+ conf.setStrings("mapred.local.dir", localDirs);
|
|
|
conf.set(FileSystem.FS_DEFAULT_NAME_KEY, "file:///");
|
|
|
fs = FileSystem.get(conf);
|
|
|
+ Class<? extends TaskController> taskControllerClass = conf.getClass(
|
|
|
+ "mapred.task.tracker.task-controller", DefaultTaskController.class,
|
|
|
+ TaskController.class);
|
|
|
+ taskController = (TaskController) ReflectionUtils.newInstance(
|
|
|
+ taskControllerClass, conf);
|
|
|
+
|
|
|
+ // setup permissions for mapred local dir
|
|
|
+ taskController.setup();
|
|
|
|
|
|
// Create the temporary cache files to be used in the tests.
|
|
|
firstCacheFile = new Path(TEST_ROOT_DIR, "firstcachefile");
|
|
@@ -100,6 +113,11 @@ public class TestTrackerDistributedCacheManager extends TestCase {
|
|
|
createPrivateTempFile(firstCacheFile);
|
|
|
createPrivateTempFile(secondCacheFile);
|
|
|
}
|
|
|
+
|
|
|
+ protected void refreshConf(Configuration conf) throws IOException {
|
|
|
+ taskController.setConf(conf);
|
|
|
+ taskController.setup();
|
|
|
+ }
|
|
|
|
|
|
/**
|
|
|
* Whether the test can run on the machine
|
|
@@ -124,6 +142,8 @@ public class TestTrackerDistributedCacheManager extends TestCase {
|
|
|
// ****** Imitate JobClient code
|
|
|
// Configures a task/job with both a regular file and a "classpath" file.
|
|
|
Configuration subConf = new Configuration(conf);
|
|
|
+ String userName = getJobOwnerName();
|
|
|
+ subConf.set("user.name", userName);
|
|
|
DistributedCache.addCacheFile(firstCacheFile.toUri(), subConf);
|
|
|
DistributedCache.addFileToClassPath(secondCacheFile, subConf);
|
|
|
TrackerDistributedCacheManager.determineTimestamps(subConf);
|
|
@@ -135,11 +155,9 @@ public class TestTrackerDistributedCacheManager extends TestCase {
|
|
|
subConf.writeXml(os);
|
|
|
os.close();
|
|
|
|
|
|
- String userName = getJobOwnerName();
|
|
|
-
|
|
|
// ****** Imitate TaskRunner code.
|
|
|
TrackerDistributedCacheManager manager =
|
|
|
- new TrackerDistributedCacheManager(conf);
|
|
|
+ new TrackerDistributedCacheManager(conf, taskController);
|
|
|
TaskDistributedCacheManager handle =
|
|
|
manager.newTaskDistributedCacheManager(subConf);
|
|
|
assertNull(null, DistributedCache.getLocalCacheFiles(subConf));
|
|
@@ -147,11 +165,6 @@ public class TestTrackerDistributedCacheManager extends TestCase {
|
|
|
handle.setup(localDirAllocator, workDir, TaskTracker
|
|
|
.getPrivateDistributedCacheDir(userName),
|
|
|
TaskTracker.getPublicDistributedCacheDir());
|
|
|
-
|
|
|
- InitializationContext context = new InitializationContext();
|
|
|
- context.user = userName;
|
|
|
- context.workDir = workDir;
|
|
|
- getTaskController().initializeDistributedCache(context);
|
|
|
// ****** End of imitating TaskRunner code
|
|
|
|
|
|
Path[] localCacheFiles = DistributedCache.getLocalCacheFiles(subConf);
|
|
@@ -181,18 +194,18 @@ public class TestTrackerDistributedCacheManager extends TestCase {
|
|
|
TrackerDistributedCacheManager {
|
|
|
public FakeTrackerDistributedCacheManager(Configuration conf)
|
|
|
throws IOException {
|
|
|
- super(conf);
|
|
|
+ super(conf, taskController);
|
|
|
}
|
|
|
|
|
|
@Override
|
|
|
Path localizeCache(Configuration conf, URI cache, long confFileStamp,
|
|
|
- CacheStatus cacheStatus, FileStatus fileStatus, boolean isArchive)
|
|
|
- throws IOException {
|
|
|
+ CacheStatus cacheStatus, FileStatus fileStatus, boolean isArchive,
|
|
|
+ boolean isPublic) throws IOException {
|
|
|
if (cache.equals(firstCacheFile.toUri())) {
|
|
|
throw new IOException("fake fail");
|
|
|
}
|
|
|
return super.localizeCache(conf, cache, confFileStamp, cacheStatus,
|
|
|
- fileStatus, isArchive);
|
|
|
+ fileStatus, isArchive, isPublic);
|
|
|
}
|
|
|
}
|
|
|
|
|
@@ -201,8 +214,6 @@ public class TestTrackerDistributedCacheManager extends TestCase {
|
|
|
if (!canRun()) {
|
|
|
return;
|
|
|
}
|
|
|
- Configuration conf = new Configuration();
|
|
|
- conf.set(FileSystem.FS_DEFAULT_NAME_KEY, "file:///");
|
|
|
TrackerDistributedCacheManager manager =
|
|
|
new FakeTrackerDistributedCacheManager(conf);
|
|
|
|
|
@@ -212,6 +223,7 @@ public class TestTrackerDistributedCacheManager extends TestCase {
|
|
|
// Configures a job with a regular file
|
|
|
Job job1 = new Job(conf);
|
|
|
Configuration conf1 = job1.getConfiguration();
|
|
|
+ conf1.set("user.name", userName);
|
|
|
DistributedCache.addCacheFile(secondCacheFile.toUri(), conf1);
|
|
|
|
|
|
TrackerDistributedCacheManager.determineTimestamps(conf1);
|
|
@@ -234,6 +246,7 @@ public class TestTrackerDistributedCacheManager extends TestCase {
|
|
|
// Configures another job with three regular files.
|
|
|
Job job2 = new Job(conf);
|
|
|
Configuration conf2 = job2.getConfiguration();
|
|
|
+ conf2.set("user.name", userName);
|
|
|
// add a file that would get failed to localize
|
|
|
DistributedCache.addCacheFile(firstCacheFile.toUri(), conf2);
|
|
|
// add a file that is already localized by different job
|
|
@@ -291,7 +304,7 @@ public class TestTrackerDistributedCacheManager extends TestCase {
|
|
|
private void checkLocalizedPath(String visibility)
|
|
|
throws IOException, LoginException, InterruptedException {
|
|
|
TrackerDistributedCacheManager manager =
|
|
|
- new TrackerDistributedCacheManager(conf);
|
|
|
+ new TrackerDistributedCacheManager(conf, taskController);
|
|
|
String userName = getJobOwnerName();
|
|
|
File workDir = new File(TEST_ROOT_DIR, "workdir");
|
|
|
Path cacheFile = new Path(TEST_ROOT_DIR, "fourthcachefile");
|
|
@@ -302,6 +315,7 @@ public class TestTrackerDistributedCacheManager extends TestCase {
|
|
|
}
|
|
|
|
|
|
Configuration conf1 = new Configuration(conf);
|
|
|
+ conf1.set("user.name", userName);
|
|
|
DistributedCache.addCacheFile(cacheFile.toUri(), conf1);
|
|
|
TrackerDistributedCacheManager.determineTimestamps(conf1);
|
|
|
TrackerDistributedCacheManager.determineCacheVisibilities(conf1);
|
|
@@ -322,12 +336,18 @@ public class TestTrackerDistributedCacheManager extends TestCase {
|
|
|
Path localizedPath =
|
|
|
manager.getLocalCache(cacheFile.toUri(), conf1, distCacheDir,
|
|
|
fs.getFileStatus(cacheFile), false,
|
|
|
- c.timestamp, new Path(TEST_ROOT_DIR), false);
|
|
|
+ c.timestamp, new Path(TEST_ROOT_DIR), false,
|
|
|
+ Boolean.parseBoolean(visibility));
|
|
|
assertTrue("Cache file didn't get localized in the expected directory. " +
|
|
|
"Expected localization to happen within " +
|
|
|
ROOT_MAPRED_LOCAL_DIR + "/" + distCacheDir +
|
|
|
", but was localized at " +
|
|
|
localizedPath, localizedPath.toString().contains(distCacheDir));
|
|
|
+ if ("true".equals(visibility)) {
|
|
|
+ checkPublicFilePermissions(new Path[]{localizedPath});
|
|
|
+ } else {
|
|
|
+ checkFilePermissions(new Path[]{localizedPath});
|
|
|
+ }
|
|
|
}
|
|
|
|
|
|
/**
|
|
@@ -338,17 +358,29 @@ public class TestTrackerDistributedCacheManager extends TestCase {
|
|
|
*/
|
|
|
protected void checkFilePermissions(Path[] localCacheFiles)
|
|
|
throws IOException {
|
|
|
- Path cachedFirstFile = localCacheFiles[0];
|
|
|
- Path cachedSecondFile = localCacheFiles[1];
|
|
|
- // Both the files should have executable permissions on them.
|
|
|
- assertTrue("First cache file is not executable!", new File(cachedFirstFile
|
|
|
- .toUri().getPath()).canExecute());
|
|
|
- assertTrue("Second cache file is not executable!", new File(
|
|
|
- cachedSecondFile.toUri().getPath()).canExecute());
|
|
|
+ // All the files should have executable permissions on them.
|
|
|
+ for (Path p : localCacheFiles) {
|
|
|
+ assertTrue("Cache file is not executable!", new File(p
|
|
|
+ .toUri().getPath()).canExecute());
|
|
|
+ }
|
|
|
}
|
|
|
|
|
|
- protected TaskController getTaskController() {
|
|
|
- return new DefaultTaskController();
|
|
|
+ /**
|
|
|
+ * Check permissions on the public cache files
|
|
|
+ *
|
|
|
+ * @param localCacheFiles
|
|
|
+ * @throws IOException
|
|
|
+ */
|
|
|
+ private void checkPublicFilePermissions(Path[] localCacheFiles)
|
|
|
+ throws IOException {
|
|
|
+ // All the files should have read and executable permissions for others
|
|
|
+ for (Path p : localCacheFiles) {
|
|
|
+ FsPermission perm = fs.getFileStatus(p).getPermission();
|
|
|
+ assertTrue("cache file is not readable by others", perm.getOtherAction()
|
|
|
+ .implies(FsAction.READ));
|
|
|
+ assertTrue("cache file is not executable by others", perm
|
|
|
+ .getOtherAction().implies(FsAction.EXECUTE));
|
|
|
+ }
|
|
|
}
|
|
|
|
|
|
protected String getJobOwnerName() throws LoginException {
|
|
@@ -361,27 +393,39 @@ public class TestTrackerDistributedCacheManager extends TestCase {
|
|
|
if (!canRun()) {
|
|
|
return;
|
|
|
}
|
|
|
+ // This test needs mapred.local.dir to be single directory
|
|
|
+ // instead of four, because it assumes that both
|
|
|
+ // firstcachefile and secondcachefile will be localized on same directory
|
|
|
+ // so that second localization triggers deleteCache.
|
|
|
+ // If mapred.local.dir is four directories, second localization might not
|
|
|
+ // trigger deleteCache, if it is localized in different directory.
|
|
|
+ Configuration conf2 = new Configuration(conf);
|
|
|
+ conf2.set("mapred.local.dir", ROOT_MAPRED_LOCAL_DIR.toString());
|
|
|
+ conf2.setLong("local.cache.size", LOCAL_CACHE_LIMIT);
|
|
|
+ refreshConf(conf2);
|
|
|
TrackerDistributedCacheManager manager =
|
|
|
- new TrackerDistributedCacheManager(conf);
|
|
|
- FileSystem localfs = FileSystem.getLocal(conf);
|
|
|
+ new TrackerDistributedCacheManager(conf2, taskController);
|
|
|
+ FileSystem localfs = FileSystem.getLocal(conf2);
|
|
|
long now = System.currentTimeMillis();
|
|
|
+ String userName = getJobOwnerName();
|
|
|
+ conf2.set("user.name", userName);
|
|
|
|
|
|
- manager.getLocalCache(firstCacheFile.toUri(), conf,
|
|
|
- TEST_CACHE_BASE_DIR, fs.getFileStatus(firstCacheFile), false,
|
|
|
- now, new Path(TEST_ROOT_DIR), false);
|
|
|
- manager.releaseCache(firstCacheFile.toUri(), conf, now);
|
|
|
+ Path localCache = manager.getLocalCache(firstCacheFile.toUri(), conf2,
|
|
|
+ TaskTracker.getPrivateDistributedCacheDir(userName),
|
|
|
+ fs.getFileStatus(firstCacheFile), false,
|
|
|
+ now, new Path(TEST_ROOT_DIR), false, false);
|
|
|
+ manager.releaseCache(firstCacheFile.toUri(), conf2, now);
|
|
|
//in above code,localized a file of size 4K and then release the cache
|
|
|
// which will cause the cache be deleted when the limit goes out.
|
|
|
// The below code localize another cache which's designed to
|
|
|
//sweep away the first cache.
|
|
|
- manager.getLocalCache(secondCacheFile.toUri(), conf,
|
|
|
- TEST_CACHE_BASE_DIR, fs.getFileStatus(secondCacheFile), false,
|
|
|
- System.currentTimeMillis(), new Path(TEST_ROOT_DIR), false);
|
|
|
- FileStatus[] dirStatuses = localfs.listStatus(
|
|
|
- new Path(ROOT_MAPRED_LOCAL_DIR.toString()));
|
|
|
- assertTrue("DistributedCache failed deleting old" +
|
|
|
+ manager.getLocalCache(secondCacheFile.toUri(), conf2,
|
|
|
+ TaskTracker.getPrivateDistributedCacheDir(userName),
|
|
|
+ fs.getFileStatus(secondCacheFile), false,
|
|
|
+ System.currentTimeMillis(), new Path(TEST_ROOT_DIR), false, false);
|
|
|
+ assertFalse("DistributedCache failed deleting old" +
|
|
|
" cache when the cache store is full.",
|
|
|
- dirStatuses.length == 1);
|
|
|
+ localfs.exists(localCache));
|
|
|
}
|
|
|
|
|
|
public void testFileSystemOtherThanDefault() throws Exception {
|
|
@@ -389,14 +433,17 @@ public class TestTrackerDistributedCacheManager extends TestCase {
|
|
|
return;
|
|
|
}
|
|
|
TrackerDistributedCacheManager manager =
|
|
|
- new TrackerDistributedCacheManager(conf);
|
|
|
+ new TrackerDistributedCacheManager(conf, taskController);
|
|
|
conf.set("fs.fakefile.impl", conf.get("fs.file.impl"));
|
|
|
+ String userName = getJobOwnerName();
|
|
|
+ conf.set("user.name", userName);
|
|
|
Path fileToCache = new Path("fakefile:///"
|
|
|
+ firstCacheFile.toUri().getPath());
|
|
|
Path result = manager.getLocalCache(fileToCache.toUri(), conf,
|
|
|
- TEST_CACHE_BASE_DIR, fs.getFileStatus(firstCacheFile), false,
|
|
|
+ TaskTracker.getPrivateDistributedCacheDir(userName),
|
|
|
+ fs.getFileStatus(firstCacheFile), false,
|
|
|
System.currentTimeMillis(),
|
|
|
- new Path(TEST_ROOT_DIR), false);
|
|
|
+ new Path(TEST_ROOT_DIR), false, false);
|
|
|
assertNotNull("DistributedCache cached file on non-default filesystem.",
|
|
|
result);
|
|
|
}
|
|
@@ -464,18 +511,19 @@ public class TestTrackerDistributedCacheManager extends TestCase {
|
|
|
Configuration myConf = new Configuration(conf);
|
|
|
myConf.set("fs.default.name", "refresh:///");
|
|
|
myConf.setClass("fs.refresh.impl", FakeFileSystem.class, FileSystem.class);
|
|
|
+ String userName = getJobOwnerName();
|
|
|
+
|
|
|
TrackerDistributedCacheManager manager =
|
|
|
- new TrackerDistributedCacheManager(myConf);
|
|
|
+ new TrackerDistributedCacheManager(myConf, taskController);
|
|
|
// ****** Imitate JobClient code
|
|
|
// Configures a task/job with both a regular file and a "classpath" file.
|
|
|
Configuration subConf = new Configuration(myConf);
|
|
|
+ subConf.set("user.name", userName);
|
|
|
DistributedCache.addCacheFile(firstCacheFile.toUri(), subConf);
|
|
|
TrackerDistributedCacheManager.determineTimestamps(subConf);
|
|
|
TrackerDistributedCacheManager.determineCacheVisibilities(subConf);
|
|
|
// ****** End of imitating JobClient code
|
|
|
|
|
|
- String userName = getJobOwnerName();
|
|
|
-
|
|
|
// ****** Imitate TaskRunner code.
|
|
|
TaskDistributedCacheManager handle =
|
|
|
manager.newTaskDistributedCacheManager(subConf);
|
|
@@ -516,6 +564,7 @@ public class TestTrackerDistributedCacheManager extends TestCase {
|
|
|
|
|
|
// submit another job
|
|
|
Configuration subConf2 = new Configuration(myConf);
|
|
|
+ subConf2.set("user.name", userName);
|
|
|
DistributedCache.addCacheFile(firstCacheFile.toUri(), subConf2);
|
|
|
TrackerDistributedCacheManager.determineTimestamps(subConf2);
|
|
|
TrackerDistributedCacheManager.determineCacheVisibilities(subConf2);
|
|
@@ -539,4 +588,46 @@ public class TestTrackerDistributedCacheManager extends TestCase {
|
|
|
handle.release();
|
|
|
}
|
|
|
|
|
|
+ /**
|
|
|
+ * Localize a file. After localization is complete, create a file, "myFile",
|
|
|
+ * under the directory where the file is localized and ensure that it has
|
|
|
+ * permissions different from what is set by default. Then, localize another
|
|
|
+ * file. Verify that "myFile" has the right permissions.
|
|
|
+ * @throws Exception
|
|
|
+ */
|
|
|
+ public void testCustomPermissions() throws Exception {
|
|
|
+ if (!canRun()) {
|
|
|
+ return;
|
|
|
+ }
|
|
|
+ String userName = getJobOwnerName();
|
|
|
+ conf.set("user.name", userName);
|
|
|
+ TrackerDistributedCacheManager manager =
|
|
|
+ new TrackerDistributedCacheManager(conf, taskController);
|
|
|
+ FileSystem localfs = FileSystem.getLocal(conf);
|
|
|
+ long now = System.currentTimeMillis();
|
|
|
+
|
|
|
+ Path[] localCache = new Path[2];
|
|
|
+ localCache[0] = manager.getLocalCache(firstCacheFile.toUri(), conf,
|
|
|
+ TaskTracker.getPrivateDistributedCacheDir(userName),
|
|
|
+ fs.getFileStatus(firstCacheFile), false,
|
|
|
+ now, new Path(TEST_ROOT_DIR), false, false);
|
|
|
+ FsPermission myPermission = new FsPermission((short)0600);
|
|
|
+ Path myFile = new Path(localCache[0].getParent(), "myfile.txt");
|
|
|
+ if (FileSystem.create(localfs, myFile, myPermission) == null) {
|
|
|
+ throw new IOException("Could not create " + myFile);
|
|
|
+ }
|
|
|
+ try {
|
|
|
+ localCache[1] = manager.getLocalCache(secondCacheFile.toUri(), conf,
|
|
|
+ TaskTracker.getPrivateDistributedCacheDir(userName),
|
|
|
+ fs.getFileStatus(secondCacheFile), false,
|
|
|
+ System.currentTimeMillis(), new Path(TEST_ROOT_DIR), false, false);
|
|
|
+ FileStatus stat = localfs.getFileStatus(myFile);
|
|
|
+ assertTrue(stat.getPermission().equals(myPermission));
|
|
|
+ // validate permissions of localized files.
|
|
|
+ checkFilePermissions(localCache);
|
|
|
+ } finally {
|
|
|
+ localfs.delete(myFile, false);
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
}
|