소스 검색

Merge -r 772883:772884 from trunk onto 0.20 branch. Fixes HADOOP-4372.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/core/branches/branch-0.20@777025 13f79535-47bb-0310-9956-ffa450edef68
Devaraj Das 16 년 전
부모
커밋
edb599b1f9

+ 3 - 0
CHANGES.txt

@@ -20,6 +20,9 @@ Release 0.20.1 - Unreleased
     HADOOP-4674. Fix fs help messages for -test, -text, -tail, -stat 
     and -touchz options.  (Ravi Phulari via szetszwo)
 
+    HADOOP-4372. Improves the way history filenames are obtained and manipulated.
+    (Amar Kamat via ddas)
+
   OPTIMIZATIONS
 
   BUG FIXES

+ 25 - 9
src/mapred/org/apache/hadoop/mapred/JobHistory.java

@@ -666,12 +666,9 @@ public class JobHistory {
       };
       
       FileStatus[] statuses = fs.listStatus(new Path(LOG_DIR), filter);
-      String filename;
+      String filename = null;
       if (statuses.length == 0) {
-        filename = 
-          encodeJobHistoryFileName(getNewJobHistoryFileName(jobConf, id));
-        LOG.info("Nothing to recover! Generating a new filename " + filename 
-                 + " for job " + id);
+        LOG.info("Nothing to recover for job " + id);
       } else {
         // return filename considering that fact the name can be a 
         // secondary filename like filename.recover
@@ -792,6 +789,9 @@ public class JobHistory {
     throws IOException {
       String masterLogFileName = 
         JobHistory.JobInfo.getJobHistoryFileName(conf, id);
+      if (masterLogFileName == null) {
+        return;
+      }
       Path masterLogPath = 
         JobHistory.JobInfo.getJobHistoryLogLocation(masterLogFileName);
       String tmpLogFileName = getSecondaryJobHistoryFile(masterLogFileName);
@@ -834,9 +834,18 @@ public class JobHistory {
      * @param jobConfPath path to job conf xml file in HDFS.
      * @param submitTime time when job tracker received the job
      * @throws IOException
+     * @deprecated Use 
+     *     {@link #logSubmitted(JobID, JobConf, String, long, boolean)} instead.
      */
     public static void logSubmitted(JobID jobId, JobConf jobConf, 
                                     String jobConfPath, long submitTime) 
+    throws IOException {
+      logSubmitted(jobId, jobConf, jobConfPath, submitTime, true);
+    }
+    
+    public static void logSubmitted(JobID jobId, JobConf jobConf, 
+                                    String jobConfPath, long submitTime, 
+                                    boolean restarted) 
     throws IOException {
       FileSystem fs = null;
       String userLogDir = null;
@@ -850,8 +859,13 @@ public class JobHistory {
         String user = getUserName(jobConf);
         
         // get the history filename
-        String logFileName = 
-          getJobHistoryFileName(jobConf, jobId);
+        String logFileName = null;
+        if (restarted) {
+          logFileName = getJobHistoryFileName(jobConf, jobId);
+        } else {
+          logFileName = 
+            encodeJobHistoryFileName(getNewJobHistoryFileName(jobConf, jobId));
+        }
 
         // setup the history log file for this job
         Path logFile = getJobHistoryLogLocation(logFileName);
@@ -869,8 +883,10 @@ public class JobHistory {
             // create output stream for logging in hadoop.job.history.location
             fs = new Path(LOG_DIR).getFileSystem(jobConf);
             
-            logFile = recoverJobHistoryFile(jobConf, logFile);
-            logFileName = logFile.getName();
+            if (restarted) {
+              logFile = recoverJobHistoryFile(jobConf, logFile);
+              logFileName = logFile.getName();
+            }
             
             int defaultBufferSize = 
               fs.getConf().getInt("io.file.buffer.size", 4096);

+ 5 - 1
src/mapred/org/apache/hadoop/mapred/JobInProgress.java

@@ -373,6 +373,10 @@ class JobInProgress {
     return tasksInited.get();
   }
   
+  boolean hasRestarted() {
+    return restartCount > 0;
+  }
+
   /**
    * Construct the splits, etc.  This is invoked from an async
    * thread so that split-computation doesn't block anyone.
@@ -392,7 +396,7 @@ class JobInProgress {
 
     // log job info
     JobHistory.JobInfo.logSubmitted(getJobID(), conf, jobFile.toString(), 
-                                    this.startTime);
+                                    this.startTime, hasRestarted());
     // log the job priority
     setPriority(this.priority);
     

+ 22 - 15
src/mapred/org/apache/hadoop/mapred/JobTracker.java

@@ -1234,18 +1234,23 @@ public class JobTracker implements MRConstants, InterTrackerProtocol,
           // 3. Get the log file and the file path
           String logFileName = 
             JobHistory.JobInfo.getJobHistoryFileName(job.getJobConf(), id);
-          Path jobHistoryFilePath = 
-            JobHistory.JobInfo.getJobHistoryLogLocation(logFileName);
-
-          // 4. Recover the history file. This involved
-          //     - deleting file.recover if file exists
-          //     - renaming file.recover to file if file doesnt exist
-          // This makes sure that the (master) file exists
-          JobHistory.JobInfo.recoverJobHistoryFile(job.getJobConf(), 
-                                                   jobHistoryFilePath);
+          if (logFileName != null) {
+            Path jobHistoryFilePath = 
+              JobHistory.JobInfo.getJobHistoryLogLocation(logFileName);
+
+            // 4. Recover the history file. This involved
+            //     - deleting file.recover if file exists
+            //     - renaming file.recover to file if file doesnt exist
+            // This makes sure that the (master) file exists
+            JobHistory.JobInfo.recoverJobHistoryFile(job.getJobConf(), 
+                                                     jobHistoryFilePath);
           
-          // 5. Cache the history file name as it costs one dfs access
-          jobHistoryFilenameMap.put(job.getJobID(), jobHistoryFilePath);
+            // 5. Cache the history file name as it costs one dfs access
+            jobHistoryFilenameMap.put(job.getJobID(), jobHistoryFilePath);
+          } else {
+            LOG.info("No history file found for job " + id);
+            idIter.remove(); // remove from recovery list
+          }
 
           // 6. Sumbit the job to the jobtracker
           addJob(id, job);
@@ -2020,10 +2025,12 @@ public class JobTracker implements MRConstants, InterTrackerProtocol,
 
     // start the merge of log files
     JobID id = job.getStatus().getJobID();
-    try {
-      JobHistory.JobInfo.finalizeRecovery(id, job.getJobConf());
-    } catch (IOException ioe) {
-      LOG.info("Failed to finalize the log file recovery for job " + id, ioe);
+    if (job.hasRestarted()) {
+      try {
+        JobHistory.JobInfo.finalizeRecovery(id, job.getJobConf());
+      } catch (IOException ioe) {
+        LOG.info("Failed to finalize the log file recovery for job " + id, ioe);
+      }
     }
 
     final JobTrackerInstrumentation metrics = getInstrumentation();