|
@@ -55,14 +55,10 @@ import org.apache.hadoop.fs.RemoteIterator;
|
|
|
import org.apache.hadoop.fs.permission.FsPermission;
|
|
|
import org.apache.hadoop.hdfs.MiniDFSCluster;
|
|
|
import org.apache.hadoop.io.IOUtils;
|
|
|
-import org.apache.hadoop.io.IntWritable;
|
|
|
import org.apache.hadoop.io.LongWritable;
|
|
|
import org.apache.hadoop.io.NullWritable;
|
|
|
import org.apache.hadoop.io.Text;
|
|
|
-import org.apache.hadoop.mapred.JobClient;
|
|
|
import org.apache.hadoop.mapred.JobConf;
|
|
|
-import org.apache.hadoop.mapred.JobID;
|
|
|
-import org.apache.hadoop.mapred.RunningJob;
|
|
|
import org.apache.hadoop.mapred.TaskLog;
|
|
|
import org.apache.hadoop.mapreduce.Counters;
|
|
|
import org.apache.hadoop.mapreduce.Job;
|
|
@@ -114,7 +110,6 @@ public class TestMRJobs {
|
|
|
EnumSet.of(RMAppState.FINISHED, RMAppState.FAILED, RMAppState.KILLED);
|
|
|
private static final int NUM_NODE_MGRS = 3;
|
|
|
private static final String TEST_IO_SORT_MB = "11";
|
|
|
- private static final String TEST_GROUP_MAX = "200";
|
|
|
|
|
|
private static final int DEFAULT_REDUCES = 2;
|
|
|
protected int numSleepReducers = DEFAULT_REDUCES;
|
|
@@ -307,58 +302,31 @@ public class TestMRJobs {
|
|
|
}
|
|
|
|
|
|
@Test(timeout = 300000)
|
|
|
- public void testConfVerificationWithClassloader() throws Exception {
|
|
|
- testConfVerification(true, false, false, false);
|
|
|
- }
|
|
|
-
|
|
|
- @Test(timeout = 300000)
|
|
|
- public void testConfVerificationWithClassloaderCustomClasses()
|
|
|
- throws Exception {
|
|
|
- testConfVerification(true, true, false, false);
|
|
|
- }
|
|
|
-
|
|
|
- @Test(timeout = 300000)
|
|
|
- public void testConfVerificationWithOutClassloader() throws Exception {
|
|
|
- testConfVerification(false, false, false, false);
|
|
|
- }
|
|
|
-
|
|
|
- @Test(timeout = 300000)
|
|
|
- public void testConfVerificationWithJobClient() throws Exception {
|
|
|
- testConfVerification(false, false, true, false);
|
|
|
+ public void testJobClassloader() throws IOException, InterruptedException,
|
|
|
+ ClassNotFoundException {
|
|
|
+ testJobClassloader(false);
|
|
|
}
|
|
|
|
|
|
@Test(timeout = 300000)
|
|
|
- public void testConfVerificationWithJobClientLocal() throws Exception {
|
|
|
- testConfVerification(false, false, true, true);
|
|
|
+ public void testJobClassloaderWithCustomClasses() throws IOException,
|
|
|
+ InterruptedException, ClassNotFoundException {
|
|
|
+ testJobClassloader(true);
|
|
|
}
|
|
|
|
|
|
- private void testConfVerification(boolean useJobClassLoader,
|
|
|
- boolean useCustomClasses, boolean useJobClientForMonitring,
|
|
|
- boolean useLocal) throws Exception {
|
|
|
- LOG.info("\n\n\nStarting testConfVerification()"
|
|
|
- + " jobClassloader=" + useJobClassLoader
|
|
|
- + " customClasses=" + useCustomClasses
|
|
|
- + " jobClient=" + useJobClientForMonitring
|
|
|
- + " localMode=" + useLocal);
|
|
|
+ private void testJobClassloader(boolean useCustomClasses) throws IOException,
|
|
|
+ InterruptedException, ClassNotFoundException {
|
|
|
+ LOG.info("\n\n\nStarting testJobClassloader()"
|
|
|
+ + " useCustomClasses=" + useCustomClasses);
|
|
|
|
|
|
if (!(new File(MiniMRYarnCluster.APPJAR)).exists()) {
|
|
|
LOG.info("MRAppJar " + MiniMRYarnCluster.APPJAR
|
|
|
+ " not found. Not running test.");
|
|
|
return;
|
|
|
}
|
|
|
- final Configuration clusterConfig;
|
|
|
- if (useLocal) {
|
|
|
- clusterConfig = new Configuration();
|
|
|
- conf.set(MRConfig.FRAMEWORK_NAME, MRConfig.LOCAL_FRAMEWORK_NAME);
|
|
|
- } else {
|
|
|
- clusterConfig = mrCluster.getConfig();
|
|
|
- }
|
|
|
- final JobClient jc = new JobClient(clusterConfig);
|
|
|
- final Configuration sleepConf = new Configuration(clusterConfig);
|
|
|
+ final Configuration sleepConf = new Configuration(mrCluster.getConfig());
|
|
|
// set master address to local to test that local mode applied iff framework == local
|
|
|
sleepConf.set(MRConfig.MASTER_ADDRESS, "local");
|
|
|
- sleepConf.setBoolean(MRJobConfig.MAPREDUCE_JOB_CLASSLOADER,
|
|
|
- useJobClassLoader);
|
|
|
+ sleepConf.setBoolean(MRJobConfig.MAPREDUCE_JOB_CLASSLOADER, true);
|
|
|
if (useCustomClasses) {
|
|
|
// to test AM loading user classes such as output format class, we want
|
|
|
// to blacklist them from the system classes (they need to be prepended
|
|
@@ -376,7 +344,6 @@ public class TestMRJobs {
|
|
|
sleepConf.set(MRJobConfig.MAP_LOG_LEVEL, Level.ALL.toString());
|
|
|
sleepConf.set(MRJobConfig.REDUCE_LOG_LEVEL, Level.ALL.toString());
|
|
|
sleepConf.set(MRJobConfig.MAP_JAVA_OPTS, "-verbose:class");
|
|
|
- sleepConf.set(MRJobConfig.COUNTER_GROUPS_MAX_KEY, TEST_GROUP_MAX);
|
|
|
final SleepJob sleepJob = new SleepJob();
|
|
|
sleepJob.setConf(sleepConf);
|
|
|
final Job job = sleepJob.createJob(1, 1, 10, 1, 10, 1);
|
|
@@ -394,26 +361,7 @@ public class TestMRJobs {
|
|
|
jobConf.setBoolean(MRJobConfig.MAP_SPECULATIVE, true);
|
|
|
}
|
|
|
job.submit();
|
|
|
- final boolean succeeded;
|
|
|
- if (useJobClientForMonitring && !useLocal) {
|
|
|
- // We can't use getJobID in useLocal case because JobClient and Job
|
|
|
- // point to different instances of LocalJobRunner
|
|
|
- //
|
|
|
- final JobID mapredJobID = JobID.downgrade(job.getJobID());
|
|
|
- RunningJob runningJob = null;
|
|
|
- do {
|
|
|
- Thread.sleep(10);
|
|
|
- runningJob = jc.getJob(mapredJobID);
|
|
|
- } while (runningJob == null);
|
|
|
- Assert.assertEquals("Unexpected RunningJob's "
|
|
|
- + MRJobConfig.COUNTER_GROUPS_MAX_KEY,
|
|
|
- TEST_GROUP_MAX, runningJob.getConfiguration()
|
|
|
- .get(MRJobConfig.COUNTER_GROUPS_MAX_KEY));
|
|
|
- runningJob.waitForCompletion();
|
|
|
- succeeded = runningJob.isSuccessful();
|
|
|
- } else {
|
|
|
- succeeded = job.waitForCompletion(true);
|
|
|
- }
|
|
|
+ boolean succeeded = job.waitForCompletion(true);
|
|
|
Assert.assertTrue("Job status: " + job.getStatus().getFailureInfo(),
|
|
|
succeeded);
|
|
|
}
|
|
@@ -1194,14 +1142,5 @@ public class TestMRJobs {
|
|
|
+ ", actual: " + ioSortMb);
|
|
|
}
|
|
|
}
|
|
|
-
|
|
|
- @Override
|
|
|
- public void map(IntWritable key, IntWritable value, Context context) throws IOException, InterruptedException {
|
|
|
- super.map(key, value, context);
|
|
|
- for (int i = 0; i < 100; i++) {
|
|
|
- context.getCounter("testCounterGroup-" + i,
|
|
|
- "testCounter").increment(1);
|
|
|
- }
|
|
|
- }
|
|
|
}
|
|
|
}
|