|
@@ -80,170 +80,6 @@ public class TestJobTrackerRestart extends TestCase {
|
|
}
|
|
}
|
|
|
|
|
|
/**
|
|
/**
|
|
- * Tests multiple jobs on jobtracker with restart-recovery turned on.
|
|
|
|
- * Preparation :
|
|
|
|
- * - Configure 3 jobs as follows [format {prio, maps, reducers}]
|
|
|
|
- * - job1 : {normal, 50, 1}
|
|
|
|
- * - job2 : {low, 1, 1}
|
|
|
|
- * - job3 : {high, 1, 1}
|
|
|
|
- * - Configure the cluster to run 1 reducer
|
|
|
|
- * - Lower the history file block size and buffer
|
|
|
|
- *
|
|
|
|
- * Submit these 3 jobs but make sure that job1's priority is changed and job1
|
|
|
|
- * is RUNNING before submitting other jobs
|
|
|
|
- * The order in which the jobs will be executed will be job1, job3 and job2.
|
|
|
|
- *
|
|
|
|
- * Above ordering makes sure that job1 runs before everyone else.
|
|
|
|
- * Wait for job1 to complete 50%. Note that all the jobs are configured to
|
|
|
|
- * use {@link HalfWaitingMapper} and {@link WaitingReducer}. So job1 will
|
|
|
|
- * eventually wait on 50%
|
|
|
|
- *
|
|
|
|
- * Make a note of the following things
|
|
|
|
- * - Job start times
|
|
|
|
- *
|
|
|
|
- * Restart the jobtracker
|
|
|
|
- *
|
|
|
|
- * Wait for job1 to finish all the maps and note the TaskCompletion events at
|
|
|
|
- * the tracker.
|
|
|
|
- *
|
|
|
|
- * Wait for all the jobs to finish
|
|
|
|
- *
|
|
|
|
- * Also make sure that the order in which the jobs were sorted before restart
|
|
|
|
- * remains same. For this check the follwoing
|
|
|
|
- * job1.start-time < job2.start-time < job3.start-time and
|
|
|
|
- * job1.finish-time < job3.finish-time < job2.finish-time
|
|
|
|
- * This ordering makes sure that the change of priority is logged and
|
|
|
|
- * recovered back
|
|
|
|
- */
|
|
|
|
- public void testRecoveryWithMultipleJobs(MiniDFSCluster dfs,
|
|
|
|
- MiniMRCluster mr)
|
|
|
|
- throws IOException {
|
|
|
|
- FileSystem fileSys = dfs.getFileSystem();
|
|
|
|
- JobConf jobConf = mr.createJobConf();
|
|
|
|
- JobPriority[] priorities = {JobPriority.NORMAL, JobPriority.LOW,
|
|
|
|
- JobPriority.HIGH};
|
|
|
|
- // Note that there is only 1 tracker
|
|
|
|
- int[] numMaps = {50, 1, 1};
|
|
|
|
- int[] numReds = {1, 1, 1};
|
|
|
|
-
|
|
|
|
- cleanUp(fileSys, shareDir);
|
|
|
|
-
|
|
|
|
- // Configure the jobs
|
|
|
|
- JobConf[] jobs = getJobs(jobConf, priorities, numMaps, numReds,
|
|
|
|
- outputDir, inDir,
|
|
|
|
- getMapSignalFile(shareDir),
|
|
|
|
- getReduceSignalFile(shareDir));
|
|
|
|
-
|
|
|
|
- // Master job parameters
|
|
|
|
- int masterJob = 0;
|
|
|
|
- JobPriority masterJobNewPriority = JobPriority.HIGH;
|
|
|
|
-
|
|
|
|
- // Submit a master job
|
|
|
|
- JobClient jobClient = new JobClient(jobs[masterJob]);
|
|
|
|
- RunningJob job = jobClient.submitJob(jobs[masterJob]);
|
|
|
|
- JobID id = job.getID();
|
|
|
|
-
|
|
|
|
- // Wait for the job to be inited
|
|
|
|
- mr.initializeJob(id);
|
|
|
|
-
|
|
|
|
- // Change the master job's priority so that priority logging is tested
|
|
|
|
- mr.setJobPriority(id, masterJobNewPriority);
|
|
|
|
-
|
|
|
|
- // Submit the remaining jobs and find the last job id
|
|
|
|
- for (int i = 1; i < jobs.length; ++i) {
|
|
|
|
- RunningJob rJob = (new JobClient(jobs[i])).submitJob(jobs[i]);
|
|
|
|
- mr.initializeJob(rJob.getID());
|
|
|
|
- }
|
|
|
|
-
|
|
|
|
- // Make sure that the master job is 50% completed
|
|
|
|
- while (UtilsForTests.getJobStatus(jobClient, id).mapProgress() < 0.5f) {
|
|
|
|
- UtilsForTests.waitFor(100);
|
|
|
|
- }
|
|
|
|
-
|
|
|
|
- // Note the data that needs to be tested upon restart
|
|
|
|
- long jobStartTime = UtilsForTests.getJobStatus(jobClient, id).getStartTime();
|
|
|
|
-
|
|
|
|
- // Kill the jobtracker
|
|
|
|
- mr.stopJobTracker();
|
|
|
|
-
|
|
|
|
- // Signal the maps to complete
|
|
|
|
- UtilsForTests.signalTasks(dfs, fileSys, true, getMapSignalFile(shareDir),
|
|
|
|
- getReduceSignalFile(shareDir));
|
|
|
|
-
|
|
|
|
- // Signal the reducers to complete
|
|
|
|
- UtilsForTests.signalTasks(dfs, fileSys, false, getMapSignalFile(shareDir),
|
|
|
|
- getReduceSignalFile(shareDir));
|
|
|
|
-
|
|
|
|
- // Enable recovery on restart
|
|
|
|
- mr.getJobTrackerConf().setBoolean("mapred.jobtracker.restart.recover",
|
|
|
|
- true);
|
|
|
|
-
|
|
|
|
- // Wait for a minute before submitting a job
|
|
|
|
- UtilsForTests.waitFor(60 * 1000);
|
|
|
|
-
|
|
|
|
- // Restart the jobtracker
|
|
|
|
- mr.startJobTracker();
|
|
|
|
-
|
|
|
|
- // Check if the jobs are still running
|
|
|
|
-
|
|
|
|
- // Wait for the JT to be ready
|
|
|
|
- UtilsForTests.waitForJobTracker(jobClient);
|
|
|
|
-
|
|
|
|
- // Check if the job recovered
|
|
|
|
- assertEquals("Restart failed as previously submitted job was missing",
|
|
|
|
- true, UtilsForTests.getJobStatus(jobClient, id) != null);
|
|
|
|
-
|
|
|
|
- // check if the job's priority got changed
|
|
|
|
- assertEquals("Restart failed as job's priority did not match",
|
|
|
|
- true, mr.getJobPriority(id).equals(masterJobNewPriority));
|
|
|
|
-
|
|
|
|
-
|
|
|
|
-
|
|
|
|
- UtilsForTests.waitTillDone(jobClient);
|
|
|
|
-
|
|
|
|
- // Check if the jobs are in order .. the order is 1->3->2
|
|
|
|
- JobStatus[] newStatuses = jobClient.getAllJobs();
|
|
|
|
- // Check if the jobs are in the order of submission
|
|
|
|
- // This is important for the following checks
|
|
|
|
- boolean jobOrder = newStatuses[0].getJobID().getId() == 1
|
|
|
|
- && newStatuses[1].getJobID().getId() == 2
|
|
|
|
- && newStatuses[2].getJobID().getId() == 3;
|
|
|
|
- assertTrue("Job submission order changed", jobOrder);
|
|
|
|
-
|
|
|
|
- // Start times are in order and non zero
|
|
|
|
- boolean startTimeOrder = newStatuses[0].getStartTime() > 0
|
|
|
|
- && newStatuses[0].getStartTime()
|
|
|
|
- < newStatuses[1].getStartTime()
|
|
|
|
- && newStatuses[1].getStartTime()
|
|
|
|
- < newStatuses[2].getStartTime();
|
|
|
|
- assertTrue("Job start-times are out of order", startTimeOrder);
|
|
|
|
-
|
|
|
|
-// boolean finishTimeOrder =
|
|
|
|
-// mr.getJobFinishTime(newStatuses[0].getJobID()) > 0
|
|
|
|
-// && mr.getJobFinishTime(newStatuses[0].getJobID())
|
|
|
|
-// < mr.getJobFinishTime(newStatuses[2].getJobID())
|
|
|
|
-// && mr.getJobFinishTime(newStatuses[2].getJobID())
|
|
|
|
-// < mr.getJobFinishTime(newStatuses[1].getJobID());
|
|
|
|
-// assertTrue("Jobs finish-times are out of order", finishTimeOrder);
|
|
|
|
-
|
|
|
|
-
|
|
|
|
- // This should be used for testing job counters
|
|
|
|
- job.getCounters();
|
|
|
|
-
|
|
|
|
- // check if the job was successful
|
|
|
|
- assertTrue("Previously submitted job was not successful",
|
|
|
|
- job.isSuccessful());
|
|
|
|
-
|
|
|
|
- // Check if the start time was recovered
|
|
|
|
- assertTrue("Previously submitted job's start time has changed",
|
|
|
|
- UtilsForTests.getJobStatus(jobClient, id).getStartTime()
|
|
|
|
- == jobStartTime);
|
|
|
|
-
|
|
|
|
- // Test history files
|
|
|
|
- testJobHistoryFiles(id, jobs[masterJob]);
|
|
|
|
- }
|
|
|
|
-
|
|
|
|
- /**
|
|
|
|
* Tests the jobtracker with restart-recovery turned off.
|
|
* Tests the jobtracker with restart-recovery turned off.
|
|
* Submit a job with normal priority, maps = 2, reducers = 0}
|
|
* Submit a job with normal priority, maps = 2, reducers = 0}
|
|
*
|
|
*
|
|
@@ -371,6 +207,9 @@ public class TestJobTrackerRestart extends TestCase {
|
|
RunningJob job = jobClient.submitJob(newConf);
|
|
RunningJob job = jobClient.submitJob(newConf);
|
|
JobID id = job.getID();
|
|
JobID id = job.getID();
|
|
|
|
|
|
|
|
+ // change the job priority
|
|
|
|
+ mr.setJobPriority(id, JobPriority.HIGH);
|
|
|
|
+
|
|
mr.initializeJob(id);
|
|
mr.initializeJob(id);
|
|
|
|
|
|
// make sure that atleast on reducer is spawned
|
|
// make sure that atleast on reducer is spawned
|
|
@@ -447,6 +286,10 @@ public class TestJobTrackerRestart extends TestCase {
|
|
testTaskReports(prevMapReports, afterMapReports, numToMatch - 1);
|
|
testTaskReports(prevMapReports, afterMapReports, numToMatch - 1);
|
|
testTaskReports(prevSetupReports, afterSetupReports, 1);
|
|
testTaskReports(prevSetupReports, afterSetupReports, 1);
|
|
|
|
|
|
|
|
+ // check the job priority
|
|
|
|
+ assertEquals("Job priority change is not reflected",
|
|
|
|
+ JobPriority.HIGH, mr.getJobPriority(id));
|
|
|
|
+
|
|
// Signal the reduce tasks
|
|
// Signal the reduce tasks
|
|
UtilsForTests.signalTasks(dfs, fileSys, false, getMapSignalFile(shareDir),
|
|
UtilsForTests.signalTasks(dfs, fileSys, false, getMapSignalFile(shareDir),
|
|
getReduceSignalFile(shareDir));
|
|
getReduceSignalFile(shareDir));
|
|
@@ -592,9 +435,6 @@ public class TestJobTrackerRestart extends TestCase {
|
|
|
|
|
|
mr = new MiniMRCluster(1, namenode, 1, null, null, jtConf);
|
|
mr = new MiniMRCluster(1, namenode, 1, null, null, jtConf);
|
|
|
|
|
|
- // Test multiple jobs on jobtracker with restart-recovery turned on
|
|
|
|
- testRecoveryWithMultipleJobs(dfs, mr);
|
|
|
|
-
|
|
|
|
// Test the tasktracker SYNC
|
|
// Test the tasktracker SYNC
|
|
testTaskEventsAndReportsWithRecovery(dfs, mr);
|
|
testTaskEventsAndReportsWithRecovery(dfs, mr);
|
|
|
|
|