|
@@ -24,7 +24,11 @@ import org.apache.hadoop.conf.Configuration;
|
|
|
import org.apache.hadoop.fs.Path;
|
|
|
import org.apache.hadoop.fs.permission.FsPermission;
|
|
|
import org.apache.hadoop.mapred.JobConf;
|
|
|
+import org.apache.hadoop.mapreduce.InputSplit;
|
|
|
import org.apache.hadoop.mapreduce.Job;
|
|
|
+import org.apache.hadoop.mapreduce.JobContext;
|
|
|
+import org.apache.hadoop.mapreduce.JobID;
|
|
|
+import org.apache.hadoop.security.UserGroupInformation;
|
|
|
import org.apache.hadoop.tools.rumen.JobStory;
|
|
|
import org.apache.hadoop.util.ToolRunner;
|
|
|
import org.apache.log4j.Level;
|
|
@@ -34,6 +38,7 @@ import org.junit.Test;
|
|
|
|
|
|
import java.io.IOException;
|
|
|
import java.util.ArrayList;
|
|
|
+import java.util.List;
|
|
|
import java.util.concurrent.BlockingQueue;
|
|
|
import java.util.concurrent.CountDownLatch;
|
|
|
import java.util.concurrent.LinkedBlockingQueue;
|
|
@@ -126,6 +131,31 @@ public class TestSleepJob {
|
|
|
System.out.println(" Replay ended at " + System.currentTimeMillis());
|
|
|
}
|
|
|
|
|
|
+ @Test
|
|
|
+ public void testRandomLocationSubmit() throws Exception {
|
|
|
+ policy = GridmixJobSubmissionPolicy.STRESS;
|
|
|
+ System.out.println(" Random locations started at " + System.currentTimeMillis());
|
|
|
+ doSubmission("-D"+JobCreator.SLEEPJOB_RANDOM_LOCATIONS+"=3");
|
|
|
+ System.out.println(" Random locations ended at " + System.currentTimeMillis());
|
|
|
+ }
|
|
|
+
|
|
|
+ @Test
|
|
|
+ public void testMapTasksOnlySubmit() throws Exception {
|
|
|
+ policy = GridmixJobSubmissionPolicy.STRESS;
|
|
|
+ System.out.println(" Map tasks only at " + System.currentTimeMillis());
|
|
|
+ doSubmission("-D"+SleepJob.SLEEPJOB_MAPTASK_ONLY+"=true");
|
|
|
+ System.out.println(" Map tasks only ended at " + System.currentTimeMillis());
|
|
|
+ }
|
|
|
+
|
|
|
+ @Test
|
|
|
+ public void testLimitTaskSleepTimeSubmit() throws Exception {
|
|
|
+ policy = GridmixJobSubmissionPolicy.STRESS;
|
|
|
+ System.out.println(" Limit sleep time only at " + System.currentTimeMillis());
|
|
|
+ doSubmission("-D" + SleepJob.GRIDMIX_SLEEP_MAX_MAP_TIME + "=100", "-D"
|
|
|
+ + SleepJob.GRIDMIX_SLEEP_MAX_REDUCE_TIME + "=200");
|
|
|
+ System.out.println(" Limit sleep time ended at " + System.currentTimeMillis());
|
|
|
+ }
|
|
|
+
|
|
|
@Test
|
|
|
public void testStressSubmit() throws Exception {
|
|
|
policy = GridmixJobSubmissionPolicy.STRESS;
|
|
@@ -141,22 +171,83 @@ public class TestSleepJob {
|
|
|
doSubmission();
|
|
|
System.out.println("Serial ended at " + System.currentTimeMillis());
|
|
|
}
|
|
|
+
|
|
|
+ @Test
|
|
|
+ public void testRandomLocation() throws Exception {
|
|
|
+ UserGroupInformation ugi = UserGroupInformation.getLoginUser();
|
|
|
+ // testRandomLocation(0, 10, ugi);
|
|
|
+ testRandomLocation(1, 10, ugi);
|
|
|
+ testRandomLocation(2, 10, ugi);
|
|
|
+ }
|
|
|
+
|
|
|
+ private void testRandomLocation(int locations, int njobs, UserGroupInformation ugi) throws Exception {
|
|
|
+ Configuration conf = new Configuration();
|
|
|
+ conf.setInt(JobCreator.SLEEPJOB_RANDOM_LOCATIONS, locations);
|
|
|
+ DebugJobProducer jobProducer = new DebugJobProducer(njobs, conf);
|
|
|
+ JobConf jconf = GridmixTestUtils.mrCluster.createJobConf(new JobConf(conf));
|
|
|
+ JobStory story;
|
|
|
+ int seq=1;
|
|
|
+ while ((story = jobProducer.getNextJob()) != null) {
|
|
|
+ GridmixJob gridmixJob = JobCreator.SLEEPJOB.createGridmixJob(jconf, 0,
|
|
|
+ story, new Path("ignored"), ugi, seq++);
|
|
|
+ gridmixJob.buildSplits(null);
|
|
|
+ List<InputSplit> splits = new SleepJob.SleepInputFormat()
|
|
|
+ .getSplits(gridmixJob.getJob());
|
|
|
+ for (InputSplit split : splits) {
|
|
|
+ assertEquals(locations, split.getLocations().length);
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ @Test
|
|
|
+ public void testMapTasksOnlySleepJobs()
|
|
|
+ throws Exception {
|
|
|
+ Configuration conf = new Configuration();
|
|
|
+ conf.setBoolean(SleepJob.SLEEPJOB_MAPTASK_ONLY, true);
|
|
|
+ DebugJobProducer jobProducer = new DebugJobProducer(5, conf);
|
|
|
+ JobConf jconf = GridmixTestUtils.mrCluster.createJobConf(new JobConf(conf));
|
|
|
+ UserGroupInformation ugi = UserGroupInformation.getLoginUser();
|
|
|
+ JobStory story;
|
|
|
+ int seq = 1;
|
|
|
+ while ((story = jobProducer.getNextJob()) != null) {
|
|
|
+ GridmixJob gridmixJob = JobCreator.SLEEPJOB.createGridmixJob(jconf, 0,
|
|
|
+ story, new Path("ignored"), ugi, seq++);
|
|
|
+ gridmixJob.buildSplits(null);
|
|
|
+ Job job = gridmixJob.call();
|
|
|
+ assertEquals(0, job.getNumReduceTasks());
|
|
|
+ }
|
|
|
+ }
|
|
|
|
|
|
-
|
|
|
- private void doSubmission() throws Exception {
|
|
|
+ private void doSubmission(String...optional) throws Exception {
|
|
|
final Path in = new Path("foo").makeQualified(GridmixTestUtils.dfs);
|
|
|
final Path out = GridmixTestUtils.DEST.makeQualified(GridmixTestUtils.dfs);
|
|
|
final Path root = new Path("/user");
|
|
|
Configuration conf = null;
|
|
|
try {
|
|
|
- final String[] argv = {"-D" + FilePool.GRIDMIX_MIN_FILE + "=0",
|
|
|
+ // required options
|
|
|
+ final String[] required = {"-D" + FilePool.GRIDMIX_MIN_FILE + "=0",
|
|
|
"-D" + Gridmix.GRIDMIX_OUT_DIR + "=" + out,
|
|
|
"-D" + Gridmix.GRIDMIX_USR_RSV + "=" + EchoUserResolver.class.getName(),
|
|
|
"-D" + JobCreator.GRIDMIX_JOB_TYPE + "=" + JobCreator.SLEEPJOB.name(),
|
|
|
- "-D" + SleepJob.GRIDMIX_SLEEP_INTERVAL +"=" +"10",
|
|
|
- "-generate",String.valueOf(GENDATA) + "m", in.toString(), "-"
|
|
|
- // ignored by DebugGridmix
|
|
|
+ "-D" + SleepJob.GRIDMIX_SLEEP_INTERVAL +"=" +"10"
|
|
|
};
|
|
|
+ // mandatory arguments
|
|
|
+ final String[] mandatory = {
|
|
|
+ "-generate",String.valueOf(GENDATA) + "m", in.toString(), "-"
|
|
|
+ // ignored by DebugGridmix
|
|
|
+ };
|
|
|
+
|
|
|
+ ArrayList<String> argv = new ArrayList<String>(required.length+optional.length+mandatory.length);
|
|
|
+ for (String s : required) {
|
|
|
+ argv.add(s);
|
|
|
+ }
|
|
|
+ for (String s : optional) {
|
|
|
+ argv.add(s);
|
|
|
+ }
|
|
|
+ for (String s : mandatory) {
|
|
|
+ argv.add(s);
|
|
|
+ }
|
|
|
+
|
|
|
DebugGridmix client = new DebugGridmix();
|
|
|
conf = new Configuration();
|
|
|
conf.setEnum(GridmixJobSubmissionPolicy.JOB_SUBMISSION_POLICY, policy);
|
|
@@ -165,7 +256,12 @@ public class TestSleepJob {
|
|
|
// allow synthetic users to create home directories
|
|
|
GridmixTestUtils.dfs.mkdirs(root, new FsPermission((short) 0777));
|
|
|
GridmixTestUtils.dfs.setPermission(root, new FsPermission((short) 0777));
|
|
|
- int res = ToolRunner.run(conf, client, argv);
|
|
|
+ String[] args = argv.toArray(new String[argv.size()]);
|
|
|
+ System.out.println("Command line arguments:");
|
|
|
+ for (int i=0; i<args.length; ++i) {
|
|
|
+ System.out.printf(" [%d] %s\n", i, args[i]);
|
|
|
+ }
|
|
|
+ int res = ToolRunner.run(conf, client, args);
|
|
|
assertEquals("Client exited with nonzero status", 0, res);
|
|
|
client.checkMonitor();
|
|
|
} catch (Exception e) {
|