|
@@ -121,20 +121,11 @@ public class TestJobTrackerRestart extends TestCase {
|
|
|
mr.getJobTrackerConf().setBoolean("mapred.jobtracker.restart.recover",
|
|
|
false);
|
|
|
|
|
|
- // Wait for a minute before submitting a job
|
|
|
- UtilsForTests.waitFor(60 * 1000);
|
|
|
-
|
|
|
mr.startJobTracker();
|
|
|
|
|
|
- // Signal the tasks
|
|
|
- UtilsForTests.signalTasks(dfs, fileSys, true, getMapSignalFile(shareDir),
|
|
|
- getReduceSignalFile(shareDir));
|
|
|
-
|
|
|
// Wait for the JT to be ready
|
|
|
UtilsForTests.waitForJobTracker(jobClient);
|
|
|
|
|
|
- UtilsForTests.waitTillDone(jobClient);
|
|
|
-
|
|
|
// The submitted job should not exist
|
|
|
assertTrue("Submitted job was detected with recovery disabled",
|
|
|
UtilsForTests.getJobStatus(jobClient, id) == null);
|
|
@@ -143,14 +134,13 @@ public class TestJobTrackerRestart extends TestCase {
|
|
|
/** Tests a job on jobtracker with restart-recovery turned on.
|
|
|
* Preparation :
|
|
|
* - Configure a job with
|
|
|
- * - num-maps : 50
|
|
|
+ * - num-maps : 24
|
|
|
* - num-reducers : 1
|
|
|
* - Configure the cluster to run 1 reducer
|
|
|
* - Lower the history file block size and buffer
|
|
|
*
|
|
|
* Wait for the job to complete 50%. Note that all the job is configured to
|
|
|
- * use {@link HalfWaitingMapper} and {@link WaitingReducer}. So job will
|
|
|
- * eventually wait on 50%
|
|
|
+ * use {@link HalfWaitingMapper}. So job will eventually wait on 50%
|
|
|
*
|
|
|
* Make a note of the following things
|
|
|
* - Task completion events
|
|
@@ -191,7 +181,7 @@ public class TestJobTrackerRestart extends TestCase {
|
|
|
// Ideally the tracker should SYNC with the new/restarted jobtracker
|
|
|
|
|
|
FileSystem fileSys = dfs.getFileSystem();
|
|
|
- final int numMaps = 50;
|
|
|
+ final int numMaps = 24;
|
|
|
final int numReducers = 1;
|
|
|
|
|
|
|
|
@@ -224,7 +214,7 @@ public class TestJobTrackerRestart extends TestCase {
|
|
|
mr.getMapTaskCompletionEventsUpdates(0, id, numMaps)
|
|
|
.getMapTaskCompletionEvents();
|
|
|
if (trackerEvents.length < numMaps / 2) {
|
|
|
- UtilsForTests.waitFor(1000);
|
|
|
+ UtilsForTests.waitFor(100);
|
|
|
} else {
|
|
|
break;
|
|
|
}
|
|
@@ -242,9 +232,6 @@ public class TestJobTrackerRestart extends TestCase {
|
|
|
mr.getJobTrackerConf().setBoolean("mapred.jobtracker.restart.recover",
|
|
|
true);
|
|
|
|
|
|
- // Wait for a minute before submitting a job
|
|
|
- UtilsForTests.waitFor(60 * 1000);
|
|
|
-
|
|
|
mr.startJobTracker();
|
|
|
|
|
|
// Signal the map tasks
|
|
@@ -274,7 +261,7 @@ public class TestJobTrackerRestart extends TestCase {
|
|
|
mr.getMapTaskCompletionEventsUpdates(0, id, 2 * numMaps)
|
|
|
.getMapTaskCompletionEvents();
|
|
|
if (trackerEvents.length < jtEvents.length) {
|
|
|
- UtilsForTests.waitFor(1000);
|
|
|
+ UtilsForTests.waitFor(100);
|
|
|
} else {
|
|
|
break;
|
|
|
}
|
|
@@ -429,10 +416,11 @@ public class TestJobTrackerRestart extends TestCase {
|
|
|
// Make sure that jobhistory leads to a proper job restart
|
|
|
// So keep the blocksize and the buffer size small
|
|
|
JobConf jtConf = new JobConf();
|
|
|
- jtConf.set("mapred.jobtracker.job.history.block.size", "1024");
|
|
|
- jtConf.set("mapred.jobtracker.job.history.buffer.size", "1024");
|
|
|
+ jtConf.set("mapred.jobtracker.job.history.block.size", "64");
|
|
|
+ jtConf.setInt("mapred.tasktracker.map.tasks.maximum", 1);
|
|
|
jtConf.setInt("mapred.tasktracker.reduce.tasks.maximum", 1);
|
|
|
- jtConf.setLong("mapred.tasktracker.expiry.interval", 25 * 1000);
|
|
|
+ jtConf.setLong("mapred.tasktracker.expiry.interval", 5000);
|
|
|
+ jtConf.setLong("mapred.job.reuse.jvm.num.tasks", -1);
|
|
|
jtConf.setBoolean("mapred.acls.enabled", true);
|
|
|
// get the user group info
|
|
|
UserGroupInformation ugi = UserGroupInformation.getCurrentUGI();
|