|
@@ -19,6 +19,7 @@
|
|
|
package org.apache.hadoop.mapreduce.v2;
|
|
|
|
|
|
import java.io.File;
|
|
|
+import java.io.FileInputStream;
|
|
|
import java.io.FileNotFoundException;
|
|
|
import java.io.FileOutputStream;
|
|
|
import java.io.IOException;
|
|
@@ -26,6 +27,7 @@ import java.net.URI;
|
|
|
import java.security.PrivilegedExceptionAction;
|
|
|
import java.util.jar.JarOutputStream;
|
|
|
import java.util.zip.ZipEntry;
|
|
|
+import org.apache.commons.io.FileUtils;
|
|
|
|
|
|
import org.apache.commons.logging.Log;
|
|
|
import org.apache.commons.logging.LogFactory;
|
|
@@ -66,6 +68,7 @@ import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
|
|
|
import org.apache.hadoop.security.UserGroupInformation;
|
|
|
import org.apache.hadoop.security.token.Token;
|
|
|
import org.apache.hadoop.security.token.TokenIdentifier;
|
|
|
+import org.apache.hadoop.util.JarFinder;
|
|
|
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
|
|
import org.junit.AfterClass;
|
|
|
import org.junit.Assert;
|
|
@@ -402,13 +405,14 @@ public class TestMRJobs {
|
|
|
Path[] archives = context.getLocalCacheArchives();
|
|
|
FileSystem fs = LocalFileSystem.get(conf);
|
|
|
|
|
|
- // Check that 3(2+ appjar) files and 2 archives are present
|
|
|
- Assert.assertEquals(3, files.length);
|
|
|
+ // Check that 4 (2 + appjar + DistrubutedCacheChecker jar) files
|
|
|
+ // and 2 archives are present
|
|
|
+ Assert.assertEquals(4, files.length);
|
|
|
Assert.assertEquals(2, archives.length);
|
|
|
|
|
|
// Check lengths of the files
|
|
|
- Assert.assertEquals(1, fs.getFileStatus(files[0]).getLen());
|
|
|
- Assert.assertTrue(fs.getFileStatus(files[1]).getLen() > 1);
|
|
|
+ Assert.assertEquals(1, fs.getFileStatus(files[1]).getLen());
|
|
|
+ Assert.assertTrue(fs.getFileStatus(files[2]).getLen() > 1);
|
|
|
|
|
|
// Check extraction of the archive
|
|
|
Assert.assertTrue(fs.exists(new Path(archives[0],
|
|
@@ -424,11 +428,23 @@ public class TestMRJobs {
|
|
|
Assert.assertNotNull(cl.getResource("distributed.jar.inside2"));
|
|
|
Assert.assertNotNull(cl.getResource("distributed.jar.inside3"));
|
|
|
Assert.assertNotNull(cl.getResource("distributed.jar.inside4"));
|
|
|
+ // The Job Jar should have been extracted to a folder named "job.jar" and
|
|
|
+ // added to the classpath; the two jar files in the lib folder in the Job
|
|
|
+ // Jar should have also been added to the classpath
|
|
|
+ Assert.assertNotNull(cl.getResource("job.jar/"));
|
|
|
+ Assert.assertNotNull(cl.getResource("job.jar/lib/lib1.jar"));
|
|
|
+ Assert.assertNotNull(cl.getResource("job.jar/lib/lib2.jar"));
|
|
|
|
|
|
// Check that the symlink for the renaming was created in the cwd;
|
|
|
File symlinkFile = new File("distributed.first.symlink");
|
|
|
Assert.assertTrue(symlinkFile.exists());
|
|
|
Assert.assertEquals(1, symlinkFile.length());
|
|
|
+
|
|
|
+ // Check that the symlink for the Job Jar was created in the cwd and
|
|
|
+ // points to the extracted directory
|
|
|
+ File jobJarDir = new File("job.jar");
|
|
|
+ Assert.assertTrue(FileUtils.isSymlink(jobJarDir));
|
|
|
+ Assert.assertTrue(jobJarDir.isDirectory());
|
|
|
}
|
|
|
}
|
|
|
|
|
@@ -451,7 +467,15 @@ public class TestMRJobs {
|
|
|
makeJar(new Path(TEST_ROOT_DIR, "distributed.fourth.jar"), 4);
|
|
|
|
|
|
Job job = Job.getInstance(mrCluster.getConfig());
|
|
|
- job.setJarByClass(DistributedCacheChecker.class);
|
|
|
+
|
|
|
+ // Set the job jar to a new "dummy" jar so we can check that its extracted
|
|
|
+ // properly
|
|
|
+ job.setJar(makeJobJarWithLib(TEST_ROOT_DIR.toUri().toString()));
|
|
|
+ // Because the job jar is a "dummy" jar, we need to include the jar with
|
|
|
+ // DistributedCacheChecker or it won't be able to find it
|
|
|
+ job.addFileToClassPath(new Path(
|
|
|
+ JarFinder.getJar(DistributedCacheChecker.class)));
|
|
|
+
|
|
|
job.setMapperClass(DistributedCacheChecker.class);
|
|
|
job.setOutputFormatClass(NullOutputFormat.class);
|
|
|
|
|
@@ -497,4 +521,45 @@ public class TestMRJobs {
|
|
|
localFs.setPermission(p, new FsPermission("700"));
|
|
|
return p;
|
|
|
}
|
|
|
+
|
|
|
+ private String makeJobJarWithLib(String testDir) throws FileNotFoundException,
|
|
|
+ IOException{
|
|
|
+ Path jobJarPath = new Path(testDir, "thejob.jar");
|
|
|
+ FileOutputStream fos =
|
|
|
+ new FileOutputStream(new File(jobJarPath.toUri().getPath()));
|
|
|
+ JarOutputStream jos = new JarOutputStream(fos);
|
|
|
+ // Have to put in real jar files or it will complain
|
|
|
+ createAndAddJarToJar(jos, new File(
|
|
|
+ new Path(testDir, "lib1.jar").toUri().getPath()));
|
|
|
+ createAndAddJarToJar(jos, new File(
|
|
|
+ new Path(testDir, "lib2.jar").toUri().getPath()));
|
|
|
+ jos.close();
|
|
|
+ localFs.setPermission(jobJarPath, new FsPermission("700"));
|
|
|
+ return jobJarPath.toUri().toString();
|
|
|
+ }
|
|
|
+
|
|
|
+ private void createAndAddJarToJar(JarOutputStream jos, File jarFile)
|
|
|
+ throws FileNotFoundException, IOException {
|
|
|
+ FileOutputStream fos2 = new FileOutputStream(jarFile);
|
|
|
+ JarOutputStream jos2 = new JarOutputStream(fos2);
|
|
|
+ // Have to have at least one entry or it will complain
|
|
|
+ ZipEntry ze = new ZipEntry("lib1.inside");
|
|
|
+ jos2.putNextEntry(ze);
|
|
|
+ jos2.closeEntry();
|
|
|
+ jos2.close();
|
|
|
+ ze = new ZipEntry("lib/" + jarFile.getName());
|
|
|
+ jos.putNextEntry(ze);
|
|
|
+ FileInputStream in = new FileInputStream(jarFile);
|
|
|
+ byte buf[] = new byte[1024];
|
|
|
+ int numRead;
|
|
|
+ do {
|
|
|
+ numRead = in.read(buf);
|
|
|
+ if (numRead >= 0) {
|
|
|
+ jos.write(buf, 0, numRead);
|
|
|
+ }
|
|
|
+ } while (numRead != -1);
|
|
|
+ in.close();
|
|
|
+ jos.closeEntry();
|
|
|
+ jarFile.delete();
|
|
|
+ }
|
|
|
}
|