|
@@ -22,18 +22,23 @@ import java.io.File;
|
|
|
import java.io.FileNotFoundException;
|
|
|
import java.io.FileOutputStream;
|
|
|
import java.io.IOException;
|
|
|
+import java.net.URI;
|
|
|
+import java.net.URISyntaxException;
|
|
|
import java.util.Random;
|
|
|
|
|
|
import javax.security.auth.login.LoginException;
|
|
|
|
|
|
import junit.framework.TestCase;
|
|
|
|
|
|
+import org.apache.commons.logging.Log;
|
|
|
+import org.apache.commons.logging.LogFactory;
|
|
|
import org.apache.hadoop.conf.Configuration;
|
|
|
import org.apache.hadoop.mapred.DefaultTaskController;
|
|
|
import org.apache.hadoop.mapred.JobConf;
|
|
|
import org.apache.hadoop.mapred.TaskController;
|
|
|
import org.apache.hadoop.mapred.TaskTracker;
|
|
|
import org.apache.hadoop.mapred.TaskController.InitializationContext;
|
|
|
+import org.apache.hadoop.mapreduce.Job;
|
|
|
import org.apache.hadoop.filecache.DistributedCache;
|
|
|
import org.apache.hadoop.fs.FileStatus;
|
|
|
import org.apache.hadoop.fs.FileSystem;
|
|
@@ -45,7 +50,10 @@ import org.apache.hadoop.filecache.TaskDistributedCacheManager;
|
|
|
import org.apache.hadoop.filecache.TrackerDistributedCacheManager;
|
|
|
import org.apache.hadoop.security.UserGroupInformation;
|
|
|
|
|
|
+
|
|
|
public class TestTrackerDistributedCacheManager extends TestCase {
|
|
|
+ private static final Log LOG =
|
|
|
+ LogFactory.getLog(TestTrackerDistributedCacheManager.class);
|
|
|
|
|
|
protected String TEST_ROOT_DIR =
|
|
|
new File(System.getProperty("test.build.data", "/tmp"),
|
|
@@ -93,6 +101,15 @@ public class TestTrackerDistributedCacheManager extends TestCase {
|
|
|
createTempFile(secondCacheFile);
|
|
|
}
|
|
|
|
|
|
+ /**
|
|
|
+ * Whether the test can run on the machine
|
|
|
+ *
|
|
|
+ * @return true if test can run on the machine, false otherwise
|
|
|
+ */
|
|
|
+ protected boolean canRun() {
|
|
|
+ return true;
|
|
|
+ }
|
|
|
+
|
|
|
/**
|
|
|
* This is the typical flow for using the DistributedCache classes.
|
|
|
*
|
|
@@ -100,6 +117,9 @@ public class TestTrackerDistributedCacheManager extends TestCase {
|
|
|
* @throws LoginException
|
|
|
*/
|
|
|
public void testManagerFlow() throws IOException, LoginException {
|
|
|
+ if (!canRun()) {
|
|
|
+ return;
|
|
|
+ }
|
|
|
|
|
|
// ****** Imitate JobClient code
|
|
|
// Configures a task/job with both a regular file and a "classpath" file.
|
|
@@ -152,6 +172,101 @@ public class TestTrackerDistributedCacheManager extends TestCase {
|
|
|
assertFalse(pathToFile(cachedFirstFile).exists());
|
|
|
}
|
|
|
|
|
|
+ /**
|
|
|
+ * This DistributedCacheManager fails in localizing firstCacheFile.
|
|
|
+ */
|
|
|
+ public class FakeTrackerDistributedCacheManager extends
|
|
|
+ TrackerDistributedCacheManager {
|
|
|
+ public FakeTrackerDistributedCacheManager(Configuration conf)
|
|
|
+ throws IOException {
|
|
|
+ super(conf);
|
|
|
+ }
|
|
|
+
|
|
|
+ @Override
|
|
|
+ Path localizeCache(Configuration conf, URI cache, long confFileStamp,
|
|
|
+ CacheStatus cacheStatus, FileStatus fileStatus, boolean isArchive)
|
|
|
+ throws IOException {
|
|
|
+ if (cache.equals(firstCacheFile.toUri())) {
|
|
|
+ throw new IOException("fake fail");
|
|
|
+ }
|
|
|
+ return super.localizeCache(conf, cache, confFileStamp, cacheStatus,
|
|
|
+ fileStatus, isArchive);
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ public void testReferenceCount() throws IOException, LoginException,
|
|
|
+ URISyntaxException {
|
|
|
+ if (!canRun()) {
|
|
|
+ return;
|
|
|
+ }
|
|
|
+ Configuration conf = new Configuration();
|
|
|
+ conf.set(FileSystem.FS_DEFAULT_NAME_KEY, "file:///");
|
|
|
+ TrackerDistributedCacheManager manager =
|
|
|
+ new FakeTrackerDistributedCacheManager(conf);
|
|
|
+
|
|
|
+ String userName = getJobOwnerName();
|
|
|
+ File workDir = new File(new Path(TEST_ROOT_DIR, "workdir").toString());
|
|
|
+
|
|
|
+ // Configures a job with a regular file
|
|
|
+ Job job1 = new Job(conf);
|
|
|
+ Configuration conf1 = job1.getConfiguration();
|
|
|
+ DistributedCache.addCacheFile(secondCacheFile.toUri(), conf1);
|
|
|
+
|
|
|
+ TrackerDistributedCacheManager.determineTimestamps(conf1);
|
|
|
+
|
|
|
+ // Task localizing for first job
|
|
|
+ TaskDistributedCacheManager handle = manager
|
|
|
+ .newTaskDistributedCacheManager(conf1);
|
|
|
+ handle.setup(localDirAllocator, workDir, TaskTracker
|
|
|
+ .getDistributedCacheDir(userName));
|
|
|
+ handle.release();
|
|
|
+ for (TaskDistributedCacheManager.CacheFile c : handle.getCacheFiles()) {
|
|
|
+ assertEquals(0, manager.getReferenceCount(c.uri, conf1, c.timestamp));
|
|
|
+ }
|
|
|
+
|
|
|
+ Path thirdCacheFile = new Path(TEST_ROOT_DIR, "thirdcachefile");
|
|
|
+ createTempFile(thirdCacheFile);
|
|
|
+
|
|
|
+ // Configures another job with three regular files.
|
|
|
+ Job job2 = new Job(conf);
|
|
|
+ Configuration conf2 = job2.getConfiguration();
|
|
|
+ // add a file that would get failed to localize
|
|
|
+ DistributedCache.addCacheFile(firstCacheFile.toUri(), conf2);
|
|
|
+ // add a file that is already localized by different job
|
|
|
+ DistributedCache.addCacheFile(secondCacheFile.toUri(), conf2);
|
|
|
+ // add a file that is never localized
|
|
|
+ DistributedCache.addCacheFile(thirdCacheFile.toUri(), conf2);
|
|
|
+
|
|
|
+ TrackerDistributedCacheManager.determineTimestamps(conf2);
|
|
|
+
|
|
|
+ // Task localizing for second job
|
|
|
+ // localization for the "firstCacheFile" will fail.
|
|
|
+ handle = manager.newTaskDistributedCacheManager(conf2);
|
|
|
+ Throwable th = null;
|
|
|
+ try {
|
|
|
+ handle.setup(localDirAllocator, workDir, TaskTracker
|
|
|
+ .getDistributedCacheDir(userName));
|
|
|
+ } catch (IOException e) {
|
|
|
+ th = e;
|
|
|
+ LOG.info("Exception during setup", e);
|
|
|
+ }
|
|
|
+ assertNotNull(th);
|
|
|
+ assertTrue(th.getMessage().contains("fake fail"));
|
|
|
+ handle.release();
|
|
|
+ th = null;
|
|
|
+ for (TaskDistributedCacheManager.CacheFile c : handle.getCacheFiles()) {
|
|
|
+ try {
|
|
|
+ assertEquals(0, manager.getReferenceCount(c.uri, conf2, c.timestamp));
|
|
|
+ } catch (IOException ie) {
|
|
|
+ th = ie;
|
|
|
+ LOG.info("Exception getting reference count for " + c.uri, ie);
|
|
|
+ }
|
|
|
+ }
|
|
|
+ assertNotNull(th);
|
|
|
+ assertTrue(th.getMessage().contains(thirdCacheFile.getName()));
|
|
|
+ fs.delete(thirdCacheFile, false);
|
|
|
+ }
|
|
|
+
|
|
|
/**
|
|
|
* Check proper permissions on the cache files
|
|
|
*
|
|
@@ -180,6 +295,9 @@ public class TestTrackerDistributedCacheManager extends TestCase {
|
|
|
|
|
|
/** test delete cache */
|
|
|
public void testDeleteCache() throws Exception {
|
|
|
+ if (!canRun()) {
|
|
|
+ return;
|
|
|
+ }
|
|
|
TrackerDistributedCacheManager manager =
|
|
|
new TrackerDistributedCacheManager(conf);
|
|
|
FileSystem localfs = FileSystem.getLocal(conf);
|
|
@@ -204,6 +322,9 @@ public class TestTrackerDistributedCacheManager extends TestCase {
|
|
|
}
|
|
|
|
|
|
public void testFileSystemOtherThanDefault() throws Exception {
|
|
|
+ if (!canRun()) {
|
|
|
+ return;
|
|
|
+ }
|
|
|
TrackerDistributedCacheManager manager =
|
|
|
new TrackerDistributedCacheManager(conf);
|
|
|
conf.set("fs.fakefile.impl", conf.get("fs.file.impl"));
|
|
@@ -262,6 +383,9 @@ public class TestTrackerDistributedCacheManager extends TestCase {
|
|
|
}
|
|
|
|
|
|
public void testFreshness() throws Exception {
|
|
|
+ if (!canRun()) {
|
|
|
+ return;
|
|
|
+ }
|
|
|
Configuration myConf = new Configuration(conf);
|
|
|
myConf.set("fs.default.name", "refresh:///");
|
|
|
myConf.setClass("fs.refresh.impl", FakeFileSystem.class, FileSystem.class);
|