Prechádzať zdrojové kódy

MAPREDUCE-4403. Test case for job resubmission in TestRecoveryManager. Contributed by Mayank Bansal.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-0.22@1361069 13f79535-47bb-0310-9956-ffa450edef68
Konstantin Shvachko 13 rokov pred
rodič
commit
83dc171b2d

+ 3 - 0
mapreduce/CHANGES.txt

@@ -8,6 +8,9 @@ Release 0.22.1 - Unreleased
 
   IMPROVEMENTS
 
+    MAPREDUCE-4403. Test case for job resubmission in TestRecoveryManager.
+    (Mayank Bansal via shv)
+
   OPTIMIZATIONS
 
   BUG FIXES

+ 68 - 0
mapreduce/src/test/mapred/org/apache/hadoop/mapred/TestRecoveryManager.java

@@ -144,6 +144,74 @@ public class TestRecoveryManager extends TestCase {
     mr.shutdown();
   }
   
+  /**
+   * Tests the re-submission of the job in case of jobtracker died/restart -
+   * submits a job and let it be inited. - kills the jobtracker - checks if the
+   * jobtraker starts normally and job is recovered while
+   */
+
+  public void testJobResubmission() throws Exception {
+
+    LOG.info("Testing Job Resubmission");
+    Path TEST_DIR = new Path(
+        System.getProperty("test.build.data", "build/tmp"),
+        "test-recovery-manager");
+    JobConf conf = new JobConf();
+    FileSystem fs = FileSystem.get(new Configuration());
+    fs.delete(TEST_DIR, true);
+    conf.set(JTConfig.JT_JOBHISTORY_BLOCK_SIZE, "1024");
+    MiniMRCluster mr = null;
+    try {
+      mr = new MiniMRCluster(1, "file:///", 1, null, null, conf);
+
+      String signalFile = new Path(TEST_DIR, "signal").toString();
+
+      // make sure that the jobtracker is in recovery mode
+      mr.getJobTrackerConf().setBoolean(JTConfig.JT_RESTART_ENABLED, true);
+
+      JobTracker jobtracker = mr.getJobTrackerRunner().getJobTracker();
+
+      JobConf job1 = mr.createJobConf();
+      UtilsForTests.configureWaitingJobConf(job1, new Path(TEST_DIR, "input"),
+          new Path(TEST_DIR, "output3"), 2, 0, "test-resubmission", signalFile,
+          signalFile);
+
+      JobClient jc = new JobClient(job1);
+      RunningJob rJob1 = jc.submitJob(job1);
+      LOG.info("Submitted first job " + rJob1.getID());
+
+      while (rJob1.mapProgress() < 0.5f) {
+        LOG.info("Waiting for job " + rJob1.getID() + " to be 50% done");
+        UtilsForTests.waitFor(100);
+      }
+
+      // kill the jobtracker
+      LOG.info("Stopping jobtracker");
+      mr.stopJobTracker();
+
+      // start the jobtracker
+      LOG.info("Starting jobtracker");
+      mr.startJobTracker();
+      UtilsForTests.waitForJobTracker(jc);
+
+      jobtracker = mr.getJobTrackerRunner().getJobTracker();
+
+      // assert that job is recovered by the jobtracker
+      assertEquals("Resubmission failed ", 1, jobtracker.getAllJobs().length);
+      JobInProgress jip = jobtracker.getJob(rJob1.getID());
+      while (!jip.isComplete()) {
+        LOG.info("Waiting for job " + rJob1.getID() + " to be successful");
+        // Signaling Map task to complete
+        fs.create(new Path(TEST_DIR, "signal"));
+        UtilsForTests.waitFor(100);
+      }
+      assertTrue("Task should be successful", rJob1.isSuccessful());
+
+    } finally {
+      mr.shutdown();
+    }
+  }
+  
   /**
    * Tests the {@link JobTracker.RecoveryManager} against the exceptions thrown 
    * during recovery. It does the following :