Browse Source

Merge -r 755997:755998 from trunk onto 0.20 branch. Fixes HADOOP-5328.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/core/branches/branch-0.20@756000 13f79535-47bb-0310-9956-ffa450edef68
Devaraj Das 16 years ago
parent
commit
31df4efbcc

+ 3 - 0
CHANGES.txt

@@ -773,6 +773,9 @@ Release 0.20.0 - Unreleased
     HADOOP-5534. Fixed a deadlock in Fair scheduler's servlet.
     (Rahul Kumar Singh via yhemanth)
 
+    HADOOP-5328. Fixes a problem in the renaming of job history files during job
+    recovery. Amar Kamat via ddas)
+
 Release 0.19.2 - Unreleased
 
   BUG FIXES

+ 53 - 9
src/mapred/org/apache/hadoop/mapred/JobHistory.java

@@ -640,13 +640,12 @@ public class JobHistory {
       if (LOG_DIR == null) {
         return null;
       }
-      
-      jobName = escapeRegexChars( jobName );
 
       // Make the pattern matching the job's history file
       final Pattern historyFilePattern = 
         Pattern.compile(jobtrackerHostname + "_" + DIGITS + "_" 
-                        + id.toString() + "_" + user + "_" + jobName + "+");
+                        + id.toString() + "_" + user + "_" 
+                        + escapeRegexChars(jobName) + "+");
       // a path filter that matches 4 parts of the filenames namely
       //  - jt-hostname
       //  - job-id
@@ -671,6 +670,8 @@ public class JobHistory {
       if (statuses.length == 0) {
         filename = 
           encodeJobHistoryFileName(getNewJobHistoryFileName(jobConf, id));
+        LOG.info("Nothing to recover! Generating a new filename " + filename 
+                 + " for job " + id);
       } else {
         // return filename considering that fact the name can be a 
         // secondary filename like filename.recover
@@ -681,6 +682,8 @@ public class JobHistory {
           filename = filename.substring(0, newLength);
         }
         filename = encodeJobHistoryFileName(filename);
+        LOG.info("Recovered job history filename for job " + id + " is " 
+                 + filename);
       }
       return filename;
     }
@@ -698,6 +701,7 @@ public class JobHistory {
       Path logPath = JobHistory.JobInfo.getJobHistoryLogLocation(fileName);
       if (logPath != null) {
         FileSystem fs = logPath.getFileSystem(conf);
+        LOG.info("Deleting job history file " + logPath.getName());
         fs.delete(logPath, false);
       }
       // do the same for the user file too
@@ -725,23 +729,57 @@ public class JobHistory {
     public synchronized static Path recoverJobHistoryFile(JobConf conf, 
                                                           Path logFilePath) 
     throws IOException {
+      Path ret;
       FileSystem fs = logFilePath.getFileSystem(conf);
-      String tmpFilename = getSecondaryJobHistoryFile(logFilePath.getName());
+      String logFileName = logFilePath.getName();
+      String tmpFilename = getSecondaryJobHistoryFile(logFileName);
       Path logDir = logFilePath.getParent();
       Path tmpFilePath = new Path(logDir, tmpFilename);
       if (fs.exists(logFilePath)) {
+        LOG.info(logFileName + " exists!");
         if (fs.exists(tmpFilePath)) {
+          LOG.info("Deleting " + tmpFilename 
+                   + "  and using " + logFileName + " for recovery.");
           fs.delete(tmpFilePath, false);
         }
-        return tmpFilePath;
+        ret = tmpFilePath;
       } else {
+        LOG.info(logFileName + " doesnt exist! Using " 
+                 + tmpFilename + " for recovery.");
         if (fs.exists(tmpFilePath)) {
+          LOG.info("Renaming " + tmpFilename + " to " + logFileName);
           fs.rename(tmpFilePath, logFilePath);
-          return tmpFilePath;
+          ret = tmpFilePath;
+        } else {
+          ret = logFilePath;
+        }
+      }
+
+      // do the same for the user files too
+      logFilePath = getJobHistoryLogLocationForUser(logFileName, conf);
+      if (logFilePath != null) {
+        fs = logFilePath.getFileSystem(conf);
+        logDir = logFilePath.getParent();
+        tmpFilePath = new Path(logDir, tmpFilename);
+        if (fs.exists(logFilePath)) {
+          LOG.info(logFileName + " exists!");
+          if (fs.exists(tmpFilePath)) {
+            LOG.info("Deleting " + tmpFilename + "  and making " + logFileName 
+                     + " as the master history file for user.");
+            fs.delete(tmpFilePath, false);
+          }
         } else {
-          return logFilePath;
+          LOG.info(logFileName + " doesnt exist! Using " 
+                   + tmpFilename + " as the master history file for user.");
+          if (fs.exists(tmpFilePath)) {
+            LOG.info("Renaming " + tmpFilename + " to " + logFileName 
+                     + " in user directory");
+            fs.rename(tmpFilePath, logFilePath);
+          }
         }
       }
+      
+      return ret;
     }
 
     /** Finalize the recovery and make one file in the end. 
@@ -765,6 +803,7 @@ public class JobHistory {
         // rename the tmp file to the master file. Note that this should be 
         // done only when the file is closed and handles are released.
         if(fs.exists(tmpLogPath)) {
+          LOG.info("Renaming " + tmpLogFileName + " to " + masterLogFileName);
           fs.rename(tmpLogPath, masterLogPath);
         }
       }
@@ -779,6 +818,8 @@ public class JobHistory {
       if (masterLogPath != null) {
         FileSystem fs = masterLogPath.getFileSystem(conf);
         if (fs.exists(tmpLogPath)) {
+          LOG.info("Renaming " + tmpLogFileName + " to " + masterLogFileName
+                   + " in user directory");
           fs.rename(tmpLogPath, masterLogPath);
         }
       }
@@ -829,6 +870,7 @@ public class JobHistory {
             fs = new Path(LOG_DIR).getFileSystem(jobConf);
             
             logFile = recoverJobHistoryFile(jobConf, logFile);
+            logFileName = logFile.getName();
             
             int defaultBufferSize = 
               fs.getConf().getInt("io.file.buffer.size", 4096);
@@ -842,13 +884,15 @@ public class JobHistory {
             writers.add(writer);
           }
           if (userLogFile != null) {
+            // Get the actual filename as recoverJobHistoryFile() might return
+            // a different filename
             userLogDir = userLogFile.getParent().toString();
+            userLogFile = new Path(userLogDir, logFileName);
+            
             // create output stream for logging 
             // in hadoop.job.history.user.location
             fs = userLogFile.getFileSystem(jobConf);
  
-            userLogFile = recoverJobHistoryFile(jobConf, userLogFile);
-            
             out = fs.create(userLogFile, true, 4096);
             writer = new PrintWriter(out);
             writers.add(writer);

+ 1 - 1
src/test/org/apache/hadoop/mapred/TestJobTrackerRestart.java

@@ -65,7 +65,7 @@ public class TestJobTrackerRestart extends TestCase {
       jobs[i] = new JobConf(conf);
       Path newOutputDir = outputDir.suffix(String.valueOf(numJobsSubmitted++));
       UtilsForTests.configureWaitingJobConf(jobs[i], inDir, newOutputDir, 
-          numMaps[i], numReds[i], "jt-restart-test-job", mapSignalFile, 
+          numMaps[i], numReds[i], "jt restart test job", mapSignalFile, 
           reduceSignalFile);
       jobs[i].setJobPriority(priorities[i]);
     }