Browse Source

MAPREDUCE-5405. Job recovery can fail if task log directory symlink from prior run still exists. Contributed by Chris Nauroth.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-1@1505760 13f79535-47bb-0310-9956-ffa450edef68
Chris Nauroth 12 years ago
parent
commit
e9bfeab4b1

+ 3 - 0
CHANGES.txt

@@ -92,6 +92,9 @@ Release 1.3.0 - unreleased
     HDFS-3794. WebHDFS Open used with Offset returns the original (and incorrect)
     HDFS-3794. WebHDFS Open used with Offset returns the original (and incorrect)
     Content Length in the HTTP Header. (Tsz Wo (Nicholas), SZE via cnauroth)
     Content Length in the HTTP Header. (Tsz Wo (Nicholas), SZE via cnauroth)
 
 
+    MAPREDUCE-5405. Job recovery can fail if task log directory symlink from
+    prior run still exists. (cnauroth)
+
 Release 1.2.1 - 2013.07.06
 Release 1.2.1 - 2013.07.06
 
 
   INCOMPATIBLE CHANGES
   INCOMPATIBLE CHANGES

+ 10 - 0
src/mapred/org/apache/hadoop/mapred/TaskLog.java

@@ -106,6 +106,16 @@ public class TaskLog {
     String strLinkAttemptLogDir = 
     String strLinkAttemptLogDir = 
         getJobDir(taskID.getJobID()).getAbsolutePath() + File.separatorChar + 
         getJobDir(taskID.getJobID()).getAbsolutePath() + File.separatorChar + 
         taskID.toString() + cleanupSuffix;
         taskID.toString() + cleanupSuffix;
+    // If the job is recovered, then the symlink might still exist from the prior
+    // run.  Symlink creation fails if it already exists, so attempt to delete
+    // first.
+    File linkAttemptLogDir = new File(strLinkAttemptLogDir);
+    if (linkAttemptLogDir.exists()) {
+      if (!linkAttemptLogDir.delete()) {
+        LOG.warn("Failed to delete existing file at path " +
+          strLinkAttemptLogDir);
+      }
+    }
     if (FileUtil.symLink(strAttemptLogDir, strLinkAttemptLogDir) != 0) {
     if (FileUtil.symLink(strAttemptLogDir, strLinkAttemptLogDir) != 0) {
       throw new IOException("Creation of symlink from " + 
       throw new IOException("Creation of symlink from " + 
                             strLinkAttemptLogDir + " to " + strAttemptLogDir +
                             strLinkAttemptLogDir + " to " + strAttemptLogDir +

+ 44 - 14
src/test/org/apache/hadoop/mapred/TestRecoveryManager.java

@@ -52,6 +52,9 @@ public class TestRecoveryManager {
   private static final Path TEST_DIR = 
   private static final Path TEST_DIR = 
     new Path(System.getProperty("test.build.data", "/tmp"), 
     new Path(System.getProperty("test.build.data", "/tmp"), 
              "test-recovery-manager");
              "test-recovery-manager");
+  private static final long AWAIT_JOB_CLEANUP_MAX_WAIT_MILLISECONDS = 5000;
+  private static final long AWAIT_JOB_CLEANUP_POLL_PERIOD_MILLISECONDS = 100;
+  private static final long NANOSECONDS_PER_MILLISECOND = 1000000;
   
   
   private FileSystem fs;
   private FileSystem fs;
   private MiniDFSCluster dfs;
   private MiniDFSCluster dfs;
@@ -174,7 +177,7 @@ public class TestRecoveryManager {
     while (!jip.isComplete()) {
     while (!jip.isComplete()) {
       LOG.info("Waiting for job " + rJob2.getID() + " to be successful");
       LOG.info("Waiting for job " + rJob2.getID() + " to be successful");
       // Signaling Map task to complete
       // Signaling Map task to complete
-      fs.create(new Path(TEST_DIR, "signal"));
+      fs.create(new Path(TEST_DIR, "signal")).close();
       UtilsForTests.waitFor(100);
       UtilsForTests.waitFor(100);
     }
     }
     Assert.assertTrue("Job should be successful", rJob2.isSuccessful());
     Assert.assertTrue("Job should be successful", rJob2.isSuccessful());
@@ -253,12 +256,12 @@ public class TestRecoveryManager {
     while (!jip.isComplete()) {
     while (!jip.isComplete()) {
       LOG.info("Waiting for job " + rJob1.getID() + " to be successful");
       LOG.info("Waiting for job " + rJob1.getID() + " to be successful");
       // Signaling Map task to complete
       // Signaling Map task to complete
-      fs.create(new Path(TEST_DIR, "signal"));
+      fs.create(new Path(TEST_DIR, "signal")).close();
       UtilsForTests.waitFor(100);
       UtilsForTests.waitFor(100);
     }
     }
     Assert.assertTrue("Task should be successful", rJob1.isSuccessful());
     Assert.assertTrue("Task should be successful", rJob1.isSuccessful());
-    Assert.assertTrue("Job should be cleaned up", !fs.exists(new Path(job1.get("mapreduce.job.dir"))));
-    Assert.assertTrue("Job should be cleaned up", !fs.exists(new Path(job2.get("mapreduce.job.dir"))));
+    Assert.assertTrue("Job should be cleaned up", awaitJobCleanup(job1));
+    Assert.assertTrue("Job should be cleaned up", awaitJobCleanup(job2));
   }
   }
 
 
   public static class TestJobTrackerInstrumentation extends JobTrackerInstrumentation {
   public static class TestJobTrackerInstrumentation extends JobTrackerInstrumentation {
@@ -356,7 +359,7 @@ public class TestRecoveryManager {
     job1.setJobPriority(JobPriority.HIGH);
     job1.setJobPriority(JobPriority.HIGH);
     
     
     UtilsForTests.configureWaitingJobConf(job1, 
     UtilsForTests.configureWaitingJobConf(job1, 
-        new Path(TEST_DIR, "input"), new Path(TEST_DIR, "output4"), 30, 0,
+        new Path(TEST_DIR, "input"), new Path(TEST_DIR, "output5"), 30, 0,
         "test-recovery-manager", signalFile, signalFile);
         "test-recovery-manager", signalFile, signalFile);
     
     
     // submit the faulty job
     // submit the faulty job
@@ -374,7 +377,7 @@ public class TestRecoveryManager {
 
 
     String signalFile1 = new Path(TEST_DIR, "signal1").toString();
     String signalFile1 = new Path(TEST_DIR, "signal1").toString();
     UtilsForTests.configureWaitingJobConf(job2, 
     UtilsForTests.configureWaitingJobConf(job2, 
-        new Path(TEST_DIR, "input"), new Path(TEST_DIR, "output5"), 20, 0,
+        new Path(TEST_DIR, "input"), new Path(TEST_DIR, "output6"), 20, 0,
         "test-recovery-manager", signalFile1, signalFile1);
         "test-recovery-manager", signalFile1, signalFile1);
     
     
     // submit the job
     // submit the job
@@ -395,7 +398,7 @@ public class TestRecoveryManager {
       UserGroupInformation.createUserForTesting("abc", new String[]{"users"});
       UserGroupInformation.createUserForTesting("abc", new String[]{"users"});
     
     
     UtilsForTests.configureWaitingJobConf(job3, 
     UtilsForTests.configureWaitingJobConf(job3, 
-        new Path(TEST_DIR, "input"), new Path(TEST_DIR, "output6"), 1, 0,
+        new Path(TEST_DIR, "input"), new Path(TEST_DIR, "output7"), 1, 0,
         "test-recovery-manager", signalFile, signalFile);
         "test-recovery-manager", signalFile, signalFile);
     
     
     // submit the job
     // submit the job
@@ -453,7 +456,7 @@ public class TestRecoveryManager {
     while (!jip.isComplete()) {
     while (!jip.isComplete()) {
       LOG.info("Waiting for job " + rJob2.getID() + " to be successful");
       LOG.info("Waiting for job " + rJob2.getID() + " to be successful");
       // Signaling Map task to complete
       // Signaling Map task to complete
-      fs.create(new Path(TEST_DIR, "signal1"));
+      fs.create(new Path(TEST_DIR, "signal1")).close();
       UtilsForTests.waitFor(100);
       UtilsForTests.waitFor(100);
     }
     }
     Assert.assertTrue("Job should be successful", rJob2.isSuccessful());
     Assert.assertTrue("Job should be successful", rJob2.isSuccessful());
@@ -488,7 +491,7 @@ public class TestRecoveryManager {
     job1.setJobPriority(JobPriority.HIGH);
     job1.setJobPriority(JobPriority.HIGH);
 
 
     UtilsForTests.configureWaitingJobConf(job1, new Path(TEST_DIR, "input"),
     UtilsForTests.configureWaitingJobConf(job1, new Path(TEST_DIR, "input"),
-        new Path(TEST_DIR, "output7"), 30, 0, "test-restart", signalFile,
+        new Path(TEST_DIR, "output8"), 30, 0, "test-restart", signalFile,
         signalFile);
         signalFile);
 
 
     // submit the faulty job
     // submit the faulty job
@@ -528,7 +531,7 @@ public class TestRecoveryManager {
     JobConf job2 = mr.createJobConf();
     JobConf job2 = mr.createJobConf();
 
 
     UtilsForTests.configureWaitingJobConf(job2, new Path(TEST_DIR, "input"),
     UtilsForTests.configureWaitingJobConf(job2, new Path(TEST_DIR, "input"),
-        new Path(TEST_DIR, "output8"), 50, 0, "test-restart-manager",
+        new Path(TEST_DIR, "output9"), 50, 0, "test-restart-manager",
         signalFile, signalFile);
         signalFile, signalFile);
 
 
     // submit a new job
     // submit a new job
@@ -658,7 +661,7 @@ public class TestRecoveryManager {
 
 
     final JobConf job1 = mr.createJobConf();
     final JobConf job1 = mr.createJobConf();
     UtilsForTests.configureWaitingJobConf(job1, new Path(HDFS_TEST_DIR, "input"),
     UtilsForTests.configureWaitingJobConf(job1, new Path(HDFS_TEST_DIR, "input"),
-        new Path(HDFS_TEST_DIR, "output3"), 2, 0, "test-resubmission", signalFile,
+        new Path(HDFS_TEST_DIR, "output10"), 2, 0, "test-resubmission", signalFile,
         signalFile);
         signalFile);
 
 
     UserGroupInformation ugi =
     UserGroupInformation ugi =
@@ -718,7 +721,7 @@ public class TestRecoveryManager {
     JobInProgress jip = jobtracker.getJob(rJob1.getID());
     JobInProgress jip = jobtracker.getJob(rJob1.getID());
 
 
     // Signaling Map task to complete
     // Signaling Map task to complete
-    fs.create(new Path(HDFS_TEST_DIR, "signal"));
+    fs.create(new Path(HDFS_TEST_DIR, "signal")).close();
     while (!jip.isComplete()) {
     while (!jip.isComplete()) {
       LOG.info("Waiting for job " + rJob1.getID() + " to be successful");
       LOG.info("Waiting for job " + rJob1.getID() + " to be successful");
       UtilsForTests.waitFor(100);
       UtilsForTests.waitFor(100);
@@ -764,7 +767,7 @@ public class TestRecoveryManager {
 
 
     final JobConf job1 = mr.createJobConf();
     final JobConf job1 = mr.createJobConf();
     UtilsForTests.configureWaitingJobConf(job1, new Path(HDFS_TEST_DIR, "input"),
     UtilsForTests.configureWaitingJobConf(job1, new Path(HDFS_TEST_DIR, "input"),
-        new Path(HDFS_TEST_DIR, "output3"), 2, 0, "test-resubmission", signalFile,
+        new Path(HDFS_TEST_DIR, "output11"), 2, 0, "test-resubmission", signalFile,
         signalFile);
         signalFile);
 
 
     UserGroupInformation ugi =
     UserGroupInformation ugi =
@@ -805,7 +808,7 @@ public class TestRecoveryManager {
     JobInProgress jip = jobtracker.getJob(rJob1.getID());
     JobInProgress jip = jobtracker.getJob(rJob1.getID());
 
 
     // Signaling Map task to complete
     // Signaling Map task to complete
-    fs.create(new Path(HDFS_TEST_DIR, "signal"));
+    fs.create(new Path(HDFS_TEST_DIR, "signal")).close();
     while (!jip.isComplete()) {
     while (!jip.isComplete()) {
       LOG.info("Waiting for job " + rJob1.getID() + " to be successful");
       LOG.info("Waiting for job " + rJob1.getID() + " to be successful");
       UtilsForTests.waitFor(100);
       UtilsForTests.waitFor(100);
@@ -813,4 +816,31 @@ public class TestRecoveryManager {
     rJob1 = jc.getJob(rJob1.getID());
     rJob1 = jc.getJob(rJob1.getID());
     Assert.assertTrue("Task should be successful", rJob1.isSuccessful());
     Assert.assertTrue("Task should be successful", rJob1.isSuccessful());
   }
   }
+
+  /**
+   * Awaits completion of job cleanup, which runs asynchronously on a background
+   * thread.  Job cleanup removes the job directory.  This method periodically
+   * polls for the existence of that directory, waiting up to a maximum time for
+   * the cleanup to complete.
+   * 
+   * @param jobConf JobConf to inspect
+   * @return boolean true if job cleanup detected, or false if job cleanup was
+   *   not detected within the maximum wait time
+   * @throws IOException if there is an I/O error checking the directory
+   * @throws InterruptedException if thread interrupted while sleeping
+   */
+  private boolean awaitJobCleanup(JobConf jobConf) throws IOException,
+      InterruptedException {
+    Path jobDir = new Path(jobConf.get("mapreduce.job.dir"));
+    long start = System.nanoTime() / NANOSECONDS_PER_MILLISECOND;
+    long elapsed = 0;
+    do {
+      if (!fs.exists(jobDir)) {
+        return true;
+      }
+      Thread.sleep(AWAIT_JOB_CLEANUP_POLL_PERIOD_MILLISECONDS);
+      elapsed = (System.nanoTime() / NANOSECONDS_PER_MILLISECOND) - start;
+    } while (elapsed < AWAIT_JOB_CLEANUP_MAX_WAIT_MILLISECONDS);
+    return false;
+  }
 }
 }