|
@@ -41,10 +41,10 @@ import org.apache.hadoop.fs.FSDataOutputStream;
|
|
|
import org.apache.hadoop.fs.FileContext;
|
|
|
import org.apache.hadoop.fs.FileStatus;
|
|
|
import org.apache.hadoop.fs.FileSystem;
|
|
|
-import org.apache.hadoop.fs.LocalFileSystem;
|
|
|
import org.apache.hadoop.fs.Path;
|
|
|
import org.apache.hadoop.fs.RemoteIterator;
|
|
|
import org.apache.hadoop.fs.permission.FsPermission;
|
|
|
+import org.apache.hadoop.hdfs.MiniDFSCluster;
|
|
|
import org.apache.hadoop.io.LongWritable;
|
|
|
import org.apache.hadoop.io.NullWritable;
|
|
|
import org.apache.hadoop.io.Text;
|
|
@@ -80,15 +80,24 @@ public class TestMRJobs {
|
|
|
private static final Log LOG = LogFactory.getLog(TestMRJobs.class);
|
|
|
|
|
|
protected static MiniMRYarnCluster mrCluster;
|
|
|
+ protected static MiniDFSCluster dfsCluster;
|
|
|
|
|
|
private static Configuration conf = new Configuration();
|
|
|
private static FileSystem localFs;
|
|
|
+ private static FileSystem remoteFs;
|
|
|
static {
|
|
|
try {
|
|
|
localFs = FileSystem.getLocal(conf);
|
|
|
} catch (IOException io) {
|
|
|
throw new RuntimeException("problem getting local fs", io);
|
|
|
}
|
|
|
+ try {
|
|
|
+ dfsCluster = new MiniDFSCluster.Builder(conf).numDataNodes(2)
|
|
|
+ .format(true).racks(null).build();
|
|
|
+ remoteFs = dfsCluster.getFileSystem();
|
|
|
+ } catch (IOException io) {
|
|
|
+ throw new RuntimeException("problem starting mini dfs cluster", io);
|
|
|
+ }
|
|
|
}
|
|
|
|
|
|
private static Path TEST_ROOT_DIR = new Path("target",
|
|
@@ -107,6 +116,8 @@ public class TestMRJobs {
|
|
|
if (mrCluster == null) {
|
|
|
mrCluster = new MiniMRYarnCluster(TestMRJobs.class.getName(), 3);
|
|
|
Configuration conf = new Configuration();
|
|
|
+ conf.set("fs.defaultFS", remoteFs.getUri().toString()); // use HDFS
|
|
|
+ conf.set(MRJobConfig.MR_AM_STAGING_DIR, "/apps_staging_dir");
|
|
|
mrCluster.init(conf);
|
|
|
mrCluster.start();
|
|
|
}
|
|
@@ -123,6 +134,10 @@ public class TestMRJobs {
|
|
|
mrCluster.stop();
|
|
|
mrCluster = null;
|
|
|
}
|
|
|
+ if (dfsCluster != null) {
|
|
|
+ dfsCluster.shutdown();
|
|
|
+ dfsCluster = null;
|
|
|
+ }
|
|
|
}
|
|
|
|
|
|
@Test
|
|
@@ -403,7 +418,6 @@ public class TestMRJobs {
|
|
|
Configuration conf = context.getConfiguration();
|
|
|
Path[] files = context.getLocalCacheFiles();
|
|
|
Path[] archives = context.getLocalCacheArchives();
|
|
|
- FileSystem fs = LocalFileSystem.get(conf);
|
|
|
|
|
|
// Check that 4 (2 + appjar + DistrubutedCacheChecker jar) files
|
|
|
// and 2 archives are present
|
|
@@ -411,13 +425,13 @@ public class TestMRJobs {
|
|
|
Assert.assertEquals(2, archives.length);
|
|
|
|
|
|
// Check lengths of the files
|
|
|
- Assert.assertEquals(1, fs.getFileStatus(files[1]).getLen());
|
|
|
- Assert.assertTrue(fs.getFileStatus(files[2]).getLen() > 1);
|
|
|
+ Assert.assertEquals(1, localFs.getFileStatus(files[1]).getLen());
|
|
|
+ Assert.assertTrue(localFs.getFileStatus(files[2]).getLen() > 1);
|
|
|
|
|
|
// Check extraction of the archive
|
|
|
- Assert.assertTrue(fs.exists(new Path(archives[0],
|
|
|
+ Assert.assertTrue(localFs.exists(new Path(archives[0],
|
|
|
"distributed.jar.inside3")));
|
|
|
- Assert.assertTrue(fs.exists(new Path(archives[1],
|
|
|
+ Assert.assertTrue(localFs.exists(new Path(archives[1],
|
|
|
"distributed.jar.inside4")));
|
|
|
|
|
|
// Check the class loaders
|
|
@@ -448,8 +462,7 @@ public class TestMRJobs {
|
|
|
}
|
|
|
}
|
|
|
|
|
|
- @Test
|
|
|
- public void testDistributedCache() throws Exception {
|
|
|
+ public void _testDistributedCache(String jobJarPath) throws Exception {
|
|
|
if (!(new File(MiniMRYarnCluster.APPJAR)).exists()) {
|
|
|
LOG.info("MRAppJar " + MiniMRYarnCluster.APPJAR
|
|
|
+ " not found. Not running test.");
|
|
@@ -470,11 +483,13 @@ public class TestMRJobs {
|
|
|
|
|
|
// Set the job jar to a new "dummy" jar so we can check that its extracted
|
|
|
// properly
|
|
|
- job.setJar(makeJobJarWithLib(TEST_ROOT_DIR.toUri().toString()));
|
|
|
+ job.setJar(jobJarPath);
|
|
|
// Because the job jar is a "dummy" jar, we need to include the jar with
|
|
|
// DistributedCacheChecker or it won't be able to find it
|
|
|
- job.addFileToClassPath(new Path(
|
|
|
- JarFinder.getJar(DistributedCacheChecker.class)));
|
|
|
+ Path distributedCacheCheckerJar = new Path(
|
|
|
+ JarFinder.getJar(DistributedCacheChecker.class));
|
|
|
+ job.addFileToClassPath(distributedCacheCheckerJar.makeQualified(
|
|
|
+ localFs.getUri(), distributedCacheCheckerJar.getParent()));
|
|
|
|
|
|
job.setMapperClass(DistributedCacheChecker.class);
|
|
|
job.setOutputFormatClass(NullOutputFormat.class);
|
|
@@ -484,7 +499,9 @@ public class TestMRJobs {
|
|
|
job.addCacheFile(
|
|
|
new URI(first.toUri().toString() + "#distributed.first.symlink"));
|
|
|
job.addFileToClassPath(second);
|
|
|
- job.addFileToClassPath(APP_JAR); // The AppMaster jar itself.
|
|
|
+ // The AppMaster jar itself
|
|
|
+ job.addFileToClassPath(
|
|
|
+ APP_JAR.makeQualified(localFs.getUri(), APP_JAR.getParent()));
|
|
|
job.addArchiveToClassPath(third);
|
|
|
job.addCacheArchive(fourth.toUri());
|
|
|
job.setMaxMapAttempts(1); // speed up failures
|
|
@@ -497,6 +514,23 @@ public class TestMRJobs {
|
|
|
" but didn't Match Job ID " + jobId ,
|
|
|
trackingUrl.endsWith(jobId.substring(jobId.lastIndexOf("_")) + "/"));
|
|
|
}
|
|
|
+
|
|
|
+ @Test
|
|
|
+ public void testDistributedCache() throws Exception {
|
|
|
+ // Test with a local (file:///) Job Jar
|
|
|
+ Path localJobJarPath = makeJobJarWithLib(TEST_ROOT_DIR.toUri().toString());
|
|
|
+ _testDistributedCache(localJobJarPath.toUri().toString());
|
|
|
+
|
|
|
+ // Test with a remote (hdfs://) Job Jar
|
|
|
+ Path remoteJobJarPath = new Path(remoteFs.getUri().toString() + "/",
|
|
|
+ localJobJarPath.getName());
|
|
|
+ remoteFs.moveFromLocalFile(localJobJarPath, remoteJobJarPath);
|
|
|
+ File localJobJarFile = new File(localJobJarPath.toUri().toString());
|
|
|
+ if (localJobJarFile.exists()) { // just to make sure
|
|
|
+ localJobJarFile.delete();
|
|
|
+ }
|
|
|
+ _testDistributedCache(remoteJobJarPath.toUri().toString());
|
|
|
+ }
|
|
|
|
|
|
private Path createTempFile(String filename, String contents)
|
|
|
throws IOException {
|
|
@@ -522,7 +556,7 @@ public class TestMRJobs {
|
|
|
return p;
|
|
|
}
|
|
|
|
|
|
- private String makeJobJarWithLib(String testDir) throws FileNotFoundException,
|
|
|
+ private Path makeJobJarWithLib(String testDir) throws FileNotFoundException,
|
|
|
IOException{
|
|
|
Path jobJarPath = new Path(testDir, "thejob.jar");
|
|
|
FileOutputStream fos =
|
|
@@ -535,7 +569,7 @@ public class TestMRJobs {
|
|
|
new Path(testDir, "lib2.jar").toUri().getPath()));
|
|
|
jos.close();
|
|
|
localFs.setPermission(jobJarPath, new FsPermission("700"));
|
|
|
- return jobJarPath.toUri().toString();
|
|
|
+ return jobJarPath;
|
|
|
}
|
|
|
|
|
|
private void createAndAddJarToJar(JarOutputStream jos, File jarFile)
|