Quellcode durchsuchen

Merge -r 752072:752073 from trunk onto 0.19 branch. Fixes HADOOP-5392.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/core/branches/branch-0.19@752077 13f79535-47bb-0310-9956-ffa450edef68
Devaraj Das vor 16 Jahren
Ursprung
Commit
0a1c94da9c

+ 3 - 0
CHANGES.txt

@@ -49,6 +49,9 @@ Release 0.19.2 - Unreleased
     references to completedJobStore outside the block where the JobTracker is locked.
     (ddas)
 
+    HADOOP-5392. Fixes a problem to do with JT crashing during recovery when
+    the job files are garbled. (Amar Kamat vi ddas)
+ 
     HADOOP-5421. Removes the test TestRecoveryManager.java from the 0.19 branch
     as it has compilation issues. (ddas) 
 

+ 47 - 27
src/mapred/org/apache/hadoop/mapred/JobTracker.java

@@ -545,11 +545,11 @@ public class JobTracker implements MRConstants, InterTrackerProtocol,
           LOG.info("Calling init from RM for job " + jip.getJobID().toString());
           try {
             jip.initTasks();
-          } catch (IOException ioe) {
+          } catch (Throwable t) {
             LOG.error("Job initialization failed : \n" 
-                      + StringUtils.stringifyException(ioe));
+                      + StringUtils.stringifyException(t));
             jip.fail(); // fail the job
-            throw ioe;
+            throw new IOException(t);
           }
         }
       }
@@ -820,19 +820,19 @@ public class JobTracker implements MRConstants, InterTrackerProtocol,
      expireLaunchingTasks.removeTask(attemptId);
     }
   
-    public void recover() throws IOException {
+    public void recover() {
       // I. Init the jobs and cache the recovered job history filenames
       Map<JobID, Path> jobHistoryFilenameMap = new HashMap<JobID, Path>();
       Iterator<JobID> idIter = jobsToRecover.iterator();
       while (idIter.hasNext()) {
         JobID id = idIter.next();
-        LOG.info("Trying to recover job " + id);
-        // 1. Create the job object
-        JobInProgress job = new JobInProgress(id, JobTracker.this, conf);
-        
-        String logFileName;
-        Path jobHistoryFilePath;
+        LOG.info("Trying to recover details of job " + id);
         try {
+          // 1. Create the job object
+          JobInProgress job = new JobInProgress(id, JobTracker.this, conf);
+          String logFileName;
+          Path jobHistoryFilePath;
+
           // 2. Get the log file and the file path
           logFileName = 
             JobHistory.JobInfo.getJobHistoryFileName(job.getJobConf(), id);
@@ -845,19 +845,19 @@ public class JobTracker implements MRConstants, InterTrackerProtocol,
           // This makes sure that the (master) file exists
           JobHistory.JobInfo.recoverJobHistoryFile(job.getJobConf(), 
                                                    jobHistoryFilePath);
-        } catch (IOException ioe) {
-          LOG.warn("Failed to recover job " + id + " history filename." 
-                   + " Ignoring.", ioe);
+          
+          // 4. Cache the history file name as it costs one dfs access
+          jobHistoryFilenameMap.put(job.getJobID(), jobHistoryFilePath);
+
+          // 5. Sumbit the job to the jobtracker
+          addJob(id, job);
+        } catch (Throwable t) {
+          LOG.warn("Failed to recover job " + id + " history details." 
+                   + " Ignoring.", t);
           // TODO : remove job details from the system directory
           idIter.remove();
           continue;
         }
-
-        // 4. Cache the history file name as it costs one dfs access
-        jobHistoryFilenameMap.put(job.getJobID(), jobHistoryFilePath);
-
-        // 5. Sumbit the job to the jobtracker
-        addJob(id, job);
       }
 
       long recoveryStartTime = System.currentTimeMillis();
@@ -871,7 +871,14 @@ public class JobTracker implements MRConstants, InterTrackerProtocol,
         Path jobHistoryFilePath = jobHistoryFilenameMap.get(pJob.getJobID());
         String logFileName = jobHistoryFilePath.getName();
 
-        FileSystem fs = jobHistoryFilePath.getFileSystem(conf);
+        FileSystem fs;
+        try {
+          fs = jobHistoryFilePath.getFileSystem(conf);
+        } catch (IOException ioe) {
+          LOG.warn("Failed to get the filesystem for job " + id + ". Ignoring.",
+                   ioe);
+          continue;
+        }
 
         // 2. Parse the history file
         // Note that this also involves job update
@@ -879,9 +886,9 @@ public class JobTracker implements MRConstants, InterTrackerProtocol,
         try {
           JobHistory.parseHistoryFromFS(jobHistoryFilePath.toString(), 
                                         listener, fs);
-        } catch (IOException e) {
-          LOG.info("JobTracker failed to recover job " + pJob.getJobID() + "."
-                     + " Ignoring it.", e);
+        } catch (Throwable t) {
+          LOG.info("JobTracker failed to recover job " + pJob.getJobID() 
+                   + " from history. Ignoring it.", t);
         }
 
         // 3. Close the listener
@@ -900,9 +907,9 @@ public class JobTracker implements MRConstants, InterTrackerProtocol,
             JobHistory.JobInfo.checkpointRecovery(logFileName, 
                                                   pJob.getJobConf());
           }
-        } catch (IOException ioe) {
+        } catch (Throwable t) {
           LOG.warn("Failed to delete log file (" + logFileName + ") for job " 
-                   + id + ". Ignoring it.", ioe);
+                   + id + ". Ignoring it.", t);
         }
 
         // 6. Inform the jobtracker as to how much of the data is recovered.
@@ -1197,7 +1204,12 @@ public class JobTracker implements MRConstants, InterTrackerProtocol,
             && !JobHistory.isDisableHistory()
             && systemDirData != null) {
           for (FileStatus status : systemDirData) {
-            recoveryManager.checkAndAddJob(status);
+            try {
+              recoveryManager.checkAndAddJob(status);
+            } catch (Throwable t) {
+              LOG.warn("Failed to add the job " + status.getPath().getName(), 
+                       t);
+            }
           }
           
           // Check if there are jobs to be recovered
@@ -1313,7 +1325,11 @@ public class JobTracker implements MRConstants, InterTrackerProtocol,
     taskScheduler.start();
     
     //  Start the recovery after starting the scheduler
-    recoveryManager.recover();
+    try {
+      recoveryManager.recover();
+    } catch (Throwable t) {
+      LOG.warn("Recovery manager crashed! Ignoring.", t);
+    }
     
     this.expireTrackersThread = new Thread(this.expireTrackers,
                                           "expireTrackers");
@@ -2738,6 +2754,10 @@ public class JobTracker implements MRConstants, InterTrackerProtocol,
         // if job is not there in the cleanup list ... add it
         synchronized (trackerToJobsToCleanup) {
           Set<JobID> jobs = trackerToJobsToCleanup.get(trackerName);
+          if (jobs == null) {
+            jobs = new HashSet<JobID>();
+            trackerToJobsToCleanup.put(trackerName, jobs);
+          }
           jobs.add(taskId.getJobID());
         }
         continue;

+ 22 - 0
src/test/org/apache/hadoop/mapred/MiniMRCluster.java

@@ -61,6 +61,7 @@ public class MiniMRCluster {
    */
   class JobTrackerRunner implements Runnable {
     private JobTracker tracker = null;
+    private volatile boolean isActive = true;
     
     JobConf jc = null;
         
@@ -72,6 +73,10 @@ public class MiniMRCluster {
       return (tracker != null);
     }
         
+    public boolean isActive() {
+      return isActive;
+    }
+
     public int getJobTrackerPort() {
       return tracker.getTrackerPort();
     }
@@ -97,6 +102,7 @@ public class MiniMRCluster {
         tracker.offerService();
       } catch (Throwable e) {
         LOG.error("Job tracker crashed", e);
+        isActive = false;
       }
     }
         
@@ -111,6 +117,7 @@ public class MiniMRCluster {
       } catch (Throwable e) {
         LOG.error("Problem shutting down job tracker", e);
       }
+      isActive = false;
     }
   }
     
@@ -548,6 +555,21 @@ public class MiniMRCluster {
       }
     }
         
+    ClusterStatus status = jobTracker.getJobTracker().getClusterStatus(false);
+    while (jobTracker.isActive() && status.getJobTrackerState() == JobTracker.State.INITIALIZING) {
+      try {
+        LOG.info("JobTracker still initializing. Waiting.");
+        Thread.sleep(1000);
+      } catch(InterruptedException e) {}
+      status = jobTracker.getJobTracker().getClusterStatus(false);
+    }
+
+    if (!jobTracker.isActive() 
+        || status.getJobTrackerState() != JobTracker.State.RUNNING) {
+      // return if jobtracker has crashed
+      return;
+    }
+ 
     // Set the configuration for the task-trackers
     this.jobTrackerPort = jobTracker.getJobTrackerPort();
     this.jobTrackerInfoPort = jobTracker.getJobTrackerInfoPort();