|
@@ -41,6 +41,7 @@ import org.apache.hadoop.FailingMapper;
|
|
import org.apache.hadoop.RandomTextWriterJob;
|
|
import org.apache.hadoop.RandomTextWriterJob;
|
|
import org.apache.hadoop.RandomTextWriterJob.RandomInputFormat;
|
|
import org.apache.hadoop.RandomTextWriterJob.RandomInputFormat;
|
|
import org.apache.hadoop.SleepJob;
|
|
import org.apache.hadoop.SleepJob;
|
|
|
|
+import org.apache.hadoop.SleepJob.SleepMapper;
|
|
import org.apache.hadoop.conf.Configuration;
|
|
import org.apache.hadoop.conf.Configuration;
|
|
import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
|
|
import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
|
|
import org.apache.hadoop.fs.FSDataOutputStream;
|
|
import org.apache.hadoop.fs.FSDataOutputStream;
|
|
@@ -98,6 +99,7 @@ public class TestMRJobs {
|
|
private static final EnumSet<RMAppState> TERMINAL_RM_APP_STATES =
|
|
private static final EnumSet<RMAppState> TERMINAL_RM_APP_STATES =
|
|
EnumSet.of(RMAppState.FINISHED, RMAppState.FAILED, RMAppState.KILLED);
|
|
EnumSet.of(RMAppState.FINISHED, RMAppState.FAILED, RMAppState.KILLED);
|
|
private static final int NUM_NODE_MGRS = 3;
|
|
private static final int NUM_NODE_MGRS = 3;
|
|
|
|
+ private static final String TEST_IO_SORT_MB = "11";
|
|
|
|
|
|
protected static MiniMRYarnCluster mrCluster;
|
|
protected static MiniMRYarnCluster mrCluster;
|
|
protected static MiniDFSCluster dfsCluster;
|
|
protected static MiniDFSCluster dfsCluster;
|
|
@@ -205,6 +207,38 @@ public class TestMRJobs {
|
|
// JobStatus?)--compare against MRJobConfig.JOB_UBERTASK_ENABLE value
|
|
// JobStatus?)--compare against MRJobConfig.JOB_UBERTASK_ENABLE value
|
|
}
|
|
}
|
|
|
|
|
|
|
|
+ @Test(timeout = 300000)
|
|
|
|
+ public void testJobClassloader() throws IOException, InterruptedException,
|
|
|
|
+ ClassNotFoundException {
|
|
|
|
+ LOG.info("\n\n\nStarting testJobClassloader().");
|
|
|
|
+
|
|
|
|
+ if (!(new File(MiniMRYarnCluster.APPJAR)).exists()) {
|
|
|
|
+ LOG.info("MRAppJar " + MiniMRYarnCluster.APPJAR
|
|
|
|
+ + " not found. Not running test.");
|
|
|
|
+ return;
|
|
|
|
+ }
|
|
|
|
+ final Configuration sleepConf = new Configuration(mrCluster.getConfig());
|
|
|
|
+ // set master address to local to test that local mode applied iff framework == local
|
|
|
|
+ sleepConf.set(MRConfig.MASTER_ADDRESS, "local");
|
|
|
|
+ sleepConf.setBoolean(MRJobConfig.MAPREDUCE_JOB_CLASSLOADER, true);
|
|
|
|
+ sleepConf.set(MRJobConfig.IO_SORT_MB, TEST_IO_SORT_MB);
|
|
|
|
+ sleepConf.set(MRJobConfig.MR_AM_LOG_LEVEL, Level.ALL.toString());
|
|
|
|
+ sleepConf.set(MRJobConfig.MAP_LOG_LEVEL, Level.ALL.toString());
|
|
|
|
+ sleepConf.set(MRJobConfig.REDUCE_LOG_LEVEL, Level.ALL.toString());
|
|
|
|
+ sleepConf.set(MRJobConfig.MAP_JAVA_OPTS, "-verbose:class");
|
|
|
|
+ final SleepJob sleepJob = new SleepJob();
|
|
|
|
+ sleepJob.setConf(sleepConf);
|
|
|
|
+ final Job job = sleepJob.createJob(1, 1, 10, 1, 10, 1);
|
|
|
|
+ job.setMapperClass(ConfVerificationMapper.class);
|
|
|
|
+ job.addFileToClassPath(APP_JAR); // The AppMaster jar itself.
|
|
|
|
+ job.setJarByClass(SleepJob.class);
|
|
|
|
+ job.setMaxMapAttempts(1); // speed up failures
|
|
|
|
+ job.submit();
|
|
|
|
+ boolean succeeded = job.waitForCompletion(true);
|
|
|
|
+ Assert.assertTrue("Job status: " + job.getStatus().getFailureInfo(),
|
|
|
|
+ succeeded);
|
|
|
|
+ }
|
|
|
|
+
|
|
protected void verifySleepJobCounters(Job job) throws InterruptedException,
|
|
protected void verifySleepJobCounters(Job job) throws InterruptedException,
|
|
IOException {
|
|
IOException {
|
|
Counters counters = job.getCounters();
|
|
Counters counters = job.getCounters();
|
|
@@ -795,4 +829,18 @@ public class TestMRJobs {
|
|
jos.closeEntry();
|
|
jos.closeEntry();
|
|
jarFile.delete();
|
|
jarFile.delete();
|
|
}
|
|
}
|
|
|
|
+
|
|
|
|
+ public static class ConfVerificationMapper extends SleepMapper {
|
|
|
|
+ @Override
|
|
|
|
+ protected void setup(Context context)
|
|
|
|
+ throws IOException, InterruptedException {
|
|
|
|
+ super.setup(context);
|
|
|
|
+ final Configuration conf = context.getConfiguration();
|
|
|
|
+ final String ioSortMb = conf.get(MRJobConfig.IO_SORT_MB);
|
|
|
|
+ if (!TEST_IO_SORT_MB.equals(ioSortMb)) {
|
|
|
|
+ throw new IOException("io.sort.mb expected: " + TEST_IO_SORT_MB
|
|
|
|
+ + ", actual: " + ioSortMb);
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+ }
|
|
}
|
|
}
|