Просмотр исходного кода

commit 5a5b9ce1a4fa99c91e497d8ee6cd50f24b326393
Author: Yahoo\! <ltucker@yahoo-inc.com>
Date: Thu Aug 13 09:35:35 2009 -0700

Applying patch 2935902.mr817.patch


git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-0.20-security-patches@1076970 13f79535-47bb-0310-9956-ffa450edef68

Owen O'Malley 14 лет назад
Родитель
Сommit
bb5cf2b62c

+ 15 - 0
src/mapred/mapred-default.xml

@@ -322,6 +322,21 @@
   before delegating them to the job history.</description>
 </property>
 
+<property>
+  <name>mapred.job.tracker.retiredjobs.cache.size</name>
+  <value>1000</value>
+  <description>The number of retired job status to keep in the cache.
+  </description>
+</property>
+
+<property>
+  <name>mapred.job.tracker.jobhistory.lru.cache.size</name>
+  <value>5</value>
+  <description>The number of job history files loaded in memory. The jobs are 
+  loaded when they are first accessed. The cache is cleared based on LRU.
+  </description>
+</property>
+
 <property>
   <name>mapred.jobtracker.instrumentation</name>
   <value>org.apache.hadoop.mapred.JobTrackerMetricsInst</value>

+ 120 - 2
src/mapred/org/apache/hadoop/mapred/JSPUtil.java

@@ -18,21 +18,39 @@
 package org.apache.hadoop.mapred;
 
 import java.io.IOException;
-import java.util.Iterator;
+import java.io.UnsupportedEncodingException;
+import java.net.URLEncoder;
 import java.util.Collection;
+import java.util.Date;
+import java.util.Iterator;
+import java.util.LinkedHashMap;
+import java.util.Map;
 
 import javax.servlet.http.HttpServletRequest;
 import javax.servlet.http.HttpServletResponse;
 
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.util.StringUtils;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.mapred.JobHistory.JobInfo;
+import org.apache.hadoop.mapred.JobTracker.RetireJobInfo;
 import org.apache.hadoop.util.ServletUtil;
+import org.apache.hadoop.util.StringUtils;
 
 class JSPUtil {
   private static final String PRIVATE_ACTIONS_KEY = "webinterface.private.actions";
   
   public static final Configuration conf = new Configuration();
 
+  //LRU based cache
+  private static final Map<String, JobInfo> jobHistoryCache = 
+    new LinkedHashMap<String, JobInfo>(); 
+
+  private static final int CACHE_SIZE = 
+    conf.getInt("mapred.job.tracker.jobhistory.lru.cache.size", 5);
+
+  private static final Log LOG = LogFactory.getLog(JSPUtil.class);
   /**
    * Method used to process the request from the job page based on the 
    * request which it has received. For example like changing priority.
@@ -181,4 +199,104 @@ class JSPUtil {
     return sb.toString();
   }
 
+  @SuppressWarnings("unchecked")
+  public static String generateRetiredJobTable(JobTracker tracker, int rowId) 
+    throws IOException {
+
+    StringBuffer sb = new StringBuffer();
+    sb.append("<table border=\"1\" cellpadding=\"5\" cellspacing=\"0\">\n");
+
+    Iterator<RetireJobInfo> iterator = 
+      tracker.retireJobs.getAll().descendingIterator();
+    if (!iterator.hasNext()) {
+      sb.append("<tr><td align=\"center\" colspan=\"8\"><i>none</i>" +
+      "</td></tr>\n");
+    } else {
+      sb.append("<tr>");
+      
+      sb.append("<td><b>Jobid</b></td>");
+      sb.append("<td><b>Priority</b></td>");
+      sb.append("<td><b>User</b></td>");
+      sb.append("<td><b>Name</b></td>");
+      sb.append("<td><b>State</b></td>");
+      sb.append("<td><b>Start Time</b></td>");
+      sb.append("<td><b>Finish Time</b></td>");
+      sb.append("<td><b>Map % Complete</b></td>");
+      sb.append("<td><b>Reduce % Complete</b></td>");
+      sb.append("<td><b>Job Scheduling Information</b></td>");
+      sb.append("</tr>\n");
+      for (int i = 0; i < 100 && iterator.hasNext(); i++) {
+        RetireJobInfo info = iterator.next();
+        String historyFile = info.getHistoryFile();
+        String historyFileUrl = null;
+        if (historyFile != null && !historyFile.equals("")) {
+          try {
+            historyFileUrl = URLEncoder.encode(info.getHistoryFile(), "UTF-8");
+          } catch (UnsupportedEncodingException e) {
+            LOG.warn("Can't create history url ", e);
+          }
+        }
+        sb.append("<tr>");
+        sb.append(
+            "<td id=\"job_" + rowId + "\">" + 
+            
+              (historyFileUrl == null ? "" :
+              "<a href=\"jobdetailshistory.jsp?jobid=" + 
+              info.status.getJobId() + "&logFile=" + historyFileUrl + "\">") + 
+              
+              info.status.getJobId() + "</a></td>" +
+            
+            "<td id=\"priority_" + rowId + "\">" + 
+              info.status.getJobPriority().toString() + "</td>" +
+            "<td id=\"user_" + rowId + "\">" + info.profile.getUser() 
+              + "</td>" +
+            "<td id=\"name_" + rowId + "\">" + info.profile.getJobName() 
+              + "</td>" +
+            "<td>" + JobStatus.getJobRunState(info.status.getRunState()) 
+              + "</td>" +
+            "<td>" + new Date(info.status.getStartTime()) + "</td>" +
+            "<td>" + new Date(info.finishTime) + "</td>" +
+            
+            "<td>" + StringUtils.formatPercent(info.status.mapProgress(), 2)
+            + ServletUtil.percentageGraph(info.status.mapProgress() * 100, 80) + 
+              "</td>" +
+            
+            "<td>" + StringUtils.formatPercent(info.status.reduceProgress(), 2)
+            + ServletUtil.percentageGraph(
+               info.status.reduceProgress() * 100, 80) + 
+              "</td>" +
+            
+            "<td>" + info.status.getSchedulingInfo() + "</td>" +
+            
+            "</tr>\n");
+        rowId++;
+      }
+    }
+    sb.append("</table>\n");
+    return sb.toString();
+  }
+
+  static JobInfo getJobInfo(HttpServletRequest request, FileSystem fs) 
+      throws IOException {
+    String jobid = request.getParameter("jobid");
+    String logFile = request.getParameter("logFile");
+    synchronized(jobHistoryCache) {
+      JobInfo jobInfo = jobHistoryCache.remove(jobid);
+      if (jobInfo == null) {
+        jobInfo = new JobHistory.JobInfo(jobid);
+        LOG.info("Loading Job History file "+jobid + ".   Cache size is " +
+            jobHistoryCache.size());
+        DefaultJobHistoryParser.parseJobTasks( logFile, jobInfo, fs) ; 
+      }
+      jobHistoryCache.put(jobid, jobInfo);
+      if (jobHistoryCache.size() > CACHE_SIZE) {
+        Iterator<Map.Entry<String, JobInfo>> it = 
+          jobHistoryCache.entrySet().iterator();
+        String removeJobId = it.next().getKey();
+        it.remove();
+        LOG.info("Job History file removed form cache "+removeJobId);
+      }
+      return jobInfo;
+    }
+  }
 }

+ 20 - 6
src/mapred/org/apache/hadoop/mapred/JobClient.java

@@ -1341,7 +1341,10 @@ public class JobClient extends Configured implements MRConstants, Tool  {
       }
     }
     LOG.info("Job complete: " + jobId);
-    job.getCounters().log(LOG);
+    Counters counters = job.getCounters();
+    if (counters != null) {
+      counters.log(LOG);
+    }
     return job.isSuccessful();
   }
 
@@ -1682,7 +1685,12 @@ public class JobClient extends Configured implements MRConstants, Tool  {
         } else {
           System.out.println();
           System.out.println(job);
-          System.out.println(job.getCounters());
+          Counters counters = job.getCounters();
+          if (counters != null) {
+            System.out.println(counters);
+          } else {
+            System.out.println("Counters not available. Job is retired.");
+          }
           exitCode = 0;
         }
       } else if (getCounter) {
@@ -1691,10 +1699,16 @@ public class JobClient extends Configured implements MRConstants, Tool  {
           System.out.println("Could not find job " + jobid);
         } else {
           Counters counters = job.getCounters();
-          Group group = counters.getGroup(counterGroupName);
-          Counter counter = group.getCounterForName(counterName);
-          System.out.println(counter.getCounter());
-          exitCode = 0;
+          if (counters == null) {
+            System.out.println("Counters not available for retired job " + 
+                jobid);
+            exitCode = -1;
+          } else {
+            Group group = counters.getGroup(counterGroupName);
+            Counter counter = group.getCounterForName(counterName);
+            System.out.println(counter.getCounter());
+            exitCode = 0;
+          }
         }
       } else if (killJob) {
         RunningJob job = getJob(JobID.forName(jobid));

+ 34 - 29
src/mapred/org/apache/hadoop/mapred/JobHistory.java

@@ -135,15 +135,19 @@ public class JobHistory {
    
     private ThreadPoolExecutor executor = null;
     private final Configuration conf;
+    private final JobTracker jobTracker;
 
    // cache from job-key to files associated with it.
     private Map<JobID, FilesHolder> fileCache = 
       new ConcurrentHashMap<JobID, FilesHolder>();
 
-    JobHistoryFilesManager(Configuration conf) throws IOException {
+    JobHistoryFilesManager(Configuration conf, JobTracker jobTracker)
+        throws IOException {
       this.conf = conf;
+      this.jobTracker = jobTracker;
     }
 
+
     void start() {
       executor = new ThreadPoolExecutor(1, 3, 1, 
           TimeUnit.HOURS, new LinkedBlockingQueue<Runnable>());
@@ -192,7 +196,22 @@ public class JobHistory {
       fileCache.remove(id);
     }
 
-    void moveToDone(final JobID id, final List<Path> paths) {
+    void moveToDone(final JobID id) {
+      final List<Path> paths = new ArrayList<Path>();
+      final Path historyFile = fileManager.getHistoryFile(id);
+      if (historyFile == null) {
+        LOG.info("No file for job-history with " + id + " found in cache!");
+      } else {
+        paths.add(historyFile);
+      }
+
+      final Path confPath = fileManager.getConfFileWriters(id);
+      if (confPath == null) {
+        LOG.info("No file for jobconf with " + id + " found in cache!");
+      } else {
+        paths.add(confPath);
+      }
+
       executor.execute(new Runnable() {
 
         public void run() {
@@ -208,12 +227,18 @@ public class JobHistory {
                     new FsPermission(HISTORY_FILE_PERMISSION));
               }
             }
-
-            //purge the job from the cache
-            fileManager.purgeJob(id);
           } catch (Throwable e) {
             LOG.error("Unable to move history file to DONE folder.", e);
           }
+          String historyFileDonePath = null;
+          if (historyFile != null) {
+            historyFileDonePath = new Path(DONE, 
+                historyFile.getName()).toString();
+          }
+          jobTracker.historyFileCopied(id, historyFileDonePath);
+          
+          //purge the job from the cache
+          fileManager.purgeJob(id);
         }
 
       });
@@ -261,8 +286,8 @@ public class JobHistory {
    * @return true if intialized properly
    *         false otherwise
    */
-  public static boolean init(JobConf conf, String hostname, 
-                              long jobTrackerStartTime){
+  public static boolean init(JobTracker jobTracker, JobConf conf,
+             String hostname, long jobTrackerStartTime){
     try {
       LOG_DIR = conf.get("hadoop.job.history.location" ,
         "file:///" + new File(
@@ -287,7 +312,7 @@ public class JobHistory {
       jtConf = conf;
 
       // initialize the file manager
-      fileManager = new JobHistoryFilesManager(conf);
+      fileManager = new JobHistoryFilesManager(conf, jobTracker);
     } catch(IOException e) {
         LOG.error("Failed to initialize JobHistory log file", e); 
         disableHistory = true;
@@ -1043,27 +1068,7 @@ public class JobHistory {
      * This *should* be the last call to jobhistory for a given job.
      */
      static void markCompleted(JobID id) throws IOException {
-       List<Path> paths = new ArrayList<Path>();
-       Path path = fileManager.getHistoryFile(id);
-       if (path == null) {
-         LOG.info("No file for job-history with " + id + " found in cache!");
-         return;
-       } else {
-         paths.add(path);
-       }
-
-       Path confPath = fileManager.getConfFileWriters(id);
-       if (confPath == null) {
-         LOG.info("No file for jobconf with " + id + " found in cache!");
-         return;
-       } else {
-         paths.add(confPath);
-       }
-
-       //move the job files to done folder and purge the job
-       if (paths.size() > 0) {
-         fileManager.moveToDone(id, paths);
-       }
+       fileManager.moveToDone(id);
      }
 
      /**

+ 25 - 8
src/mapred/org/apache/hadoop/mapred/JobInProgress.java

@@ -193,6 +193,8 @@ class JobInProgress {
   private boolean hasSpeculativeReduces;
   private long inputLength = 0;
   private String user;
+  private String historyFile = "";
+  private boolean historyFileCopied;
   
   // Per-job counters
   public static enum Counter { 
@@ -986,6 +988,22 @@ class JobInProgress {
     }
   }
 
+  String getHistoryFile() {
+    return historyFile;
+  }
+
+  synchronized void setHistoryFile(String file) {
+    this.historyFile = file;
+  }
+
+  boolean isHistoryFileCopied() {
+    return historyFileCopied;
+  }
+
+  synchronized void setHistoryFileCopied() {
+    this.historyFileCopied = true;
+  }
+  
   /**
    * Returns the job-level counters.
    * 
@@ -2324,11 +2342,13 @@ class JobInProgress {
   private synchronized void terminateJob(int jobTerminationState) {
     if ((status.getRunState() == JobStatus.RUNNING) ||
         (status.getRunState() == JobStatus.PREP)) {
+      this.finishTime = System.currentTimeMillis();
+      this.status.setMapProgress(1.0f);
+      this.status.setReduceProgress(1.0f);
+      this.status.setCleanupProgress(1.0f);
+      
       if (jobTerminationState == JobStatus.FAILED) {
-        this.status = new JobStatus(status.getJobID(),
-                                    1.0f, 1.0f, 1.0f, JobStatus.FAILED,
-                                    status.getJobPriority());
-        this.finishTime = System.currentTimeMillis();
+        this.status.setRunState(JobStatus.FAILED);
 
         // Log the job summary
         JobSummary.logJobSummary(this, jobtracker.getClusterStatus(false));
@@ -2338,10 +2358,7 @@ class JobInProgress {
                                      this.finishedMapTasks, 
                                      this.finishedReduceTasks);
       } else {
-        this.status = new JobStatus(status.getJobID(),
-                                    1.0f, 1.0f, 1.0f, JobStatus.KILLED,
-                                    status.getJobPriority());
-        this.finishTime = System.currentTimeMillis();
+        this.status.setRunState(JobStatus.KILLED);
 
         // Log the job summary
         JobSummary.logJobSummary(this, jobtracker.getClusterStatus(false));

+ 141 - 95
src/mapred/org/apache/hadoop/mapred/JobTracker.java

@@ -32,6 +32,7 @@ import java.net.UnknownHostException;
 import java.text.ParseException;
 import java.text.SimpleDateFormat;
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.Comparator;
@@ -40,6 +41,7 @@ import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Iterator;
 import java.util.LinkedHashMap;
+import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
 import java.util.Properties;
@@ -170,7 +172,7 @@ public class JobTracker implements MRConstants, InterTrackerProtocol,
     * The minimum time (in ms) that a job's information has to remain
     * in the JobTracker's memory before it is retired.
     */
-  static final int MIN_TIME_BEFORE_RETIRE = 60000;
+  static final int MIN_TIME_BEFORE_RETIRE = 0;
 
 
   private int nextJobId = 1;
@@ -413,13 +415,88 @@ public class JobTracker implements MRConstants, InterTrackerProtocol,
         
   }
 
+  synchronized void historyFileCopied(JobID jobid, String historyFile) {
+    JobInProgress job = getJob(jobid);
+    if (job != null) { //found in main cache
+      job.setHistoryFileCopied();
+      if (historyFile != null) {
+        job.setHistoryFile(historyFile);
+      }
+      return;
+    }
+    RetireJobInfo jobInfo = retireJobs.get(jobid);
+    if (jobInfo != null) { //found in retired cache
+      if (historyFile != null) {
+        jobInfo.setHistoryFile(historyFile);
+      }
+    }
+  }
+
+  static class RetireJobInfo {
+    final JobStatus status;
+    final JobProfile profile;
+    final long finishTime;
+    private String historyFile;
+    RetireJobInfo(JobStatus status, JobProfile profile, long finishTime, 
+        String historyFile) {
+      this.status = status;
+      this.profile = profile;
+      this.finishTime = finishTime;
+      this.historyFile = historyFile;
+    }
+    void setHistoryFile(String file) {
+      this.historyFile = file;
+    }
+    String getHistoryFile() {
+      return historyFile;
+    }
+  }
   ///////////////////////////////////////////////////////
   // Used to remove old finished Jobs that have been around for too long
   ///////////////////////////////////////////////////////
   class RetireJobs implements Runnable {
+    private final Map<JobID, RetireJobInfo> jobIDStatusMap = 
+      new HashMap<JobID, RetireJobInfo>();
+    private final LinkedList<RetireJobInfo> jobRetireInfoQ = 
+      new LinkedList<RetireJobInfo>();
     public RetireJobs() {
     }
 
+    synchronized void addToCache(JobInProgress job) {
+      RetireJobInfo info = new RetireJobInfo(job.getStatus(), 
+          job.getProfile(), job.getFinishTime(), job.getHistoryFile());
+      jobRetireInfoQ.add(info);
+      jobIDStatusMap.put(info.status.getJobID(), info);
+      if (jobRetireInfoQ.size() > retiredJobsCacheSize) {
+        RetireJobInfo removed = jobRetireInfoQ.remove();
+        jobIDStatusMap.remove(removed.status.getJobID());
+        LOG.info("Retired job removed from cache " + removed.status.getJobID());
+      }
+    }
+
+    synchronized RetireJobInfo get(JobID jobId) {
+      return jobIDStatusMap.get(jobId);
+    }
+
+    @SuppressWarnings("unchecked")
+    synchronized LinkedList<RetireJobInfo> getAll() {
+      return (LinkedList<RetireJobInfo>) jobRetireInfoQ.clone();
+    }
+
+    synchronized LinkedList<JobStatus> getAllJobStatus() {
+      LinkedList<JobStatus> list = new LinkedList<JobStatus>();
+      for (RetireJobInfo info : jobRetireInfoQ) {
+        list.add(info.status);
+      }
+      return list;
+    }
+
+    private boolean minConditionToRetire(JobInProgress job, long now) {
+      return job.getStatus().getRunState() != JobStatus.RUNNING &&
+          job.getStatus().getRunState() != JobStatus.PREP &&
+          (job.getFinishTime() + MIN_TIME_BEFORE_RETIRE < now) &&
+          job.isHistoryFileCopied();
+    }
     /**
      * The run method lives for the life of the JobTracker,
      * and removes Jobs that are not still running, but which
@@ -435,14 +512,35 @@ public class JobTracker implements MRConstants, InterTrackerProtocol,
 
           synchronized (jobs) {
             for(JobInProgress job: jobs.values()) {
-              if (job.getStatus().getRunState() != JobStatus.RUNNING &&
-                  job.getStatus().getRunState() != JobStatus.PREP &&
-                  (job.getFinishTime() + MIN_TIME_BEFORE_RETIRE < now) &&
+              if (minConditionToRetire(job, now) &&
                   (job.getFinishTime()  < retireBefore)) {
                 retiredJobs.add(job);
               }
             }
           }
+          synchronized (userToJobsMap) {
+            for (Map.Entry<String, ArrayList<JobInProgress>> entry : 
+                userToJobsMap.entrySet()) {
+              String user = entry.getKey();
+              ArrayList<JobInProgress> userJobs = entry.getValue();
+              Iterator<JobInProgress> it = userJobs.iterator();
+              while (it.hasNext() && 
+                  userJobs.size() > MAX_COMPLETE_USER_JOBS_IN_MEMORY) {
+                JobInProgress jobUser = it.next();
+                if (retiredJobs.contains(jobUser)) {
+                  it.remove();
+                } else if (minConditionToRetire(jobUser, now)) {
+                  LOG.info("User limit exceeded. Marking job: " + 
+                      jobUser.getJobID() + " for retire.");
+                  retiredJobs.add(jobUser);
+                  it.remove();
+                }
+              }
+              if (userJobs.isEmpty()) {
+                userToJobsMap.remove(user);
+              }
+            }
+          }
           if (!retiredJobs.isEmpty()) {
             synchronized (JobTracker.this) {
               synchronized (jobs) {
@@ -454,22 +552,13 @@ public class JobTracker implements MRConstants, InterTrackerProtocol,
                       l.jobRemoved(job);
                     }
                     String jobUser = job.getProfile().getUser();
-                    synchronized (userToJobsMap) {
-                      ArrayList<JobInProgress> userJobs =
-                        userToJobsMap.get(jobUser);
-                      synchronized (userJobs) {
-                        userJobs.remove(job);
-                      }
-                      if (userJobs.isEmpty()) {
-                        userToJobsMap.remove(jobUser);
-                      }
-                    }
                     LOG.info("Retired job with id: '" + 
                              job.getProfile().getJobID() + "' of user '" +
                              jobUser + "'");
 
                     // clean up job files from the local disk
                     JobHistory.JobInfo.cleanupJob(job.getProfile().getJobID());
+                    addToCache(job);
                   }
                 }
               }
@@ -1715,6 +1804,7 @@ public class JobTracker implements MRConstants, InterTrackerProtocol,
   Thread expireTrackersThread = null;
   RetireJobs retireJobs = new RetireJobs();
   Thread retireJobsThread = null;
+  final int retiredJobsCacheSize;
   ExpireLaunchingTasks expireLaunchingTasks = new ExpireLaunchingTasks();
   Thread expireLaunchingTaskThread = new Thread(expireLaunchingTasks,
                                                 "expireLaunchingTasks");
@@ -1795,6 +1885,8 @@ public class JobTracker implements MRConstants, InterTrackerProtocol,
       conf.getLong("mapred.tasktracker.expiry.interval", 10 * 60 * 1000);
     RETIRE_JOB_INTERVAL = conf.getLong("mapred.jobtracker.retirejob.interval", 24 * 60 * 60 * 1000);
     RETIRE_JOB_CHECK_INTERVAL = conf.getLong("mapred.jobtracker.retirejob.check", 60 * 1000);
+    retiredJobsCacheSize =
+             conf.getInt("mapred.job.tracker.retiredjobs.cache.size", 1000);
     MAX_COMPLETE_USER_JOBS_IN_MEMORY = conf.getInt("mapred.jobtracker.completeuserjobs.maximum", 100);
     MAX_BLACKLISTS_PER_TRACKER = 
         conf.getInt("mapred.max.tracker.blacklists", 4);
@@ -1864,7 +1956,7 @@ public class JobTracker implements MRConstants, InterTrackerProtocol,
         tmpInfoPort == 0, conf);
     infoServer.setAttribute("job.tracker", this);
     // initialize history parameters.
-    boolean historyInitialized = JobHistory.init(conf, this.localMachine,
+    boolean historyInitialized = JobHistory.init(this, conf, this.localMachine,
                                                  this.startTime);
     
     infoServer.addServlet("reducegraph", "/taskgraph", TaskGraphServlet.class);
@@ -2375,76 +2467,16 @@ public class JobTracker implements MRConstants, InterTrackerProtocol,
         }
       }
     }
-    
-    // Purge oldest jobs and keep at-most MAX_COMPLETE_USER_JOBS_IN_MEMORY jobs of a given user
-    // in memory; information about the purged jobs is available via
-    // JobHistory.
-    synchronized (jobs) {
-      synchronized (taskScheduler) {
-        synchronized (userToJobsMap) {
-          String jobUser = job.getProfile().getUser();
-          if (!userToJobsMap.containsKey(jobUser)) {
-            userToJobsMap.put(jobUser, 
-                              new ArrayList<JobInProgress>());
-          }
-          ArrayList<JobInProgress> userJobs = 
-            userToJobsMap.get(jobUser);
-          synchronized (userJobs) {
-            // Add the currently completed 'job'
-            userJobs.add(job);
-
-            // Check if we need to retire some jobs of this user
-            while (userJobs.size() > 
-                   MAX_COMPLETE_USER_JOBS_IN_MEMORY) {
-              JobInProgress rjob = userJobs.get(0);
-                
-              // Do not delete 'current'
-              // finished job just yet.
-              if (rjob == job) {
-                break;
-              }
 
-              // do not retire jobs that finished in the very recent past.
-              if (rjob.getFinishTime() + MIN_TIME_BEFORE_RETIRE > now) {
-                break;
-              }
-                
-              // Cleanup all datastructures
-              int rjobRunState = 
-                rjob.getStatus().getRunState();
-              if (rjobRunState == JobStatus.SUCCEEDED || 
-                  rjobRunState == JobStatus.FAILED ||
-                  rjobRunState == JobStatus.KILLED) {
-                // Ok, this call to removeTaskEntries
-                // is dangerous is some very very obscure
-                // cases; e.g. when rjob completed, hit
-                // MAX_COMPLETE_USER_JOBS_IN_MEMORY job
-                // limit and yet some task (taskid)
-                // wasn't complete!
-                removeJobTasks(rjob);
-                  
-                userJobs.remove(0);
-                jobs.remove(rjob.getProfile().getJobID());
-                for (JobInProgressListener listener : jobInProgressListeners) {
-                  listener.jobRemoved(rjob);
-                }
-                  
-                LOG.info("Retired job with id: '" + 
-                         rjob.getProfile().getJobID() + "' of user: '" +
-                         jobUser + "'");
-              } else {
-                // Do not remove jobs that aren't complete.
-                // Stop here, and let the next pass take
-                // care of purging jobs.
-                break;
-              }
-            }
-          }
-          if (userJobs.isEmpty()) {
-            userToJobsMap.remove(jobUser);
-          }
-        }
+    String jobUser = job.getProfile().getUser();
+    //add to the user to jobs mapping
+    synchronized (userToJobsMap) {
+      ArrayList<JobInProgress> userJobs = userToJobsMap.get(jobUser);
+      if (userJobs == null) {
+        userJobs =  new ArrayList<JobInProgress>();
+        userToJobsMap.put(jobUser, userJobs);
       }
+      userJobs.add(job);
     }
   }
 
@@ -3646,7 +3678,12 @@ public class JobTracker implements MRConstants, InterTrackerProtocol,
       JobInProgress job = jobs.get(jobid);
       if (job != null) {
         return job.getProfile();
-      } 
+      }  else {
+        RetireJobInfo info = retireJobs.get(jobid);
+        if (info != null) {
+          return info.profile;
+        }
+      }
     }
     return completedJobStatusStore.readJobProfile(jobid);
   }
@@ -3659,7 +3696,13 @@ public class JobTracker implements MRConstants, InterTrackerProtocol,
       JobInProgress job = jobs.get(jobid);
       if (job != null) {
         return job.getStatus();
-      } 
+      } else {
+        
+        RetireJobInfo info = retireJobs.get(jobid);
+        if (info != null) {
+          return info.status;
+        }
+      }
     }
     return completedJobStatusStore.readJobStatus(jobid);
   }
@@ -3798,19 +3841,19 @@ public class JobTracker implements MRConstants, InterTrackerProtocol,
    */
   public synchronized String[] getTaskDiagnostics(TaskAttemptID taskId)  
     throws IOException {
-    
+    List<String> taskDiagnosticInfo = null;
     JobID jobId = taskId.getJobID();
     TaskID tipId = taskId.getTaskID();
     JobInProgress job = jobs.get(jobId);
-    if (job == null) {
-      throw new IllegalArgumentException("Job " + jobId + " not found.");
-    }
-    TaskInProgress tip = job.getTaskInProgress(tipId);
-    if (tip == null) {
-      throw new IllegalArgumentException("TIP " + tipId + " not found.");
+    if (job != null) {
+      TaskInProgress tip = job.getTaskInProgress(tipId);
+      if (tip != null) {
+        taskDiagnosticInfo = tip.getDiagnosticInfo(taskId);
+      }
+      
     }
-    List<String> taskDiagnosticInfo = tip.getDiagnosticInfo(taskId);
-    return ((taskDiagnosticInfo == null) ? null 
+    
+    return ((taskDiagnosticInfo == null) ? new String[0] 
             : taskDiagnosticInfo.toArray(new String[0]));
   }
     
@@ -3879,7 +3922,10 @@ public class JobTracker implements MRConstants, InterTrackerProtocol,
   } 
   
   public JobStatus[] getAllJobs() {
-    return getJobStatus(jobs.values(),false);
+    List<JobStatus> list = new ArrayList<JobStatus>();
+    list.addAll(Arrays.asList(getJobStatus(jobs.values(),false)));
+    list.addAll(retireJobs.getAllJobStatus());
+    return list.toArray(new JobStatus[list.size()]);
   }
     
   /**

+ 2 - 2
src/test/org/apache/hadoop/mapred/TestJobHistory.java

@@ -797,7 +797,7 @@ public class TestJobHistory extends TestCase {
       JobConf conf = new JobConf();
       // keep for less time
       conf.setLong("mapred.jobtracker.retirejob.check", 1000);
-      conf.setLong("mapred.jobtracker.retirejob.interval", 1000);
+      conf.setLong("mapred.jobtracker.retirejob.interval", 100000);
 
       //set the done folder location
       String doneFolder = "history_done";
@@ -886,7 +886,7 @@ public class TestJobHistory extends TestCase {
       JobConf conf = new JobConf();
       // keep for less time
       conf.setLong("mapred.jobtracker.retirejob.check", 1000);
-      conf.setLong("mapred.jobtracker.retirejob.interval", 1000);
+      conf.setLong("mapred.jobtracker.retirejob.interval", 100000);
 
       //set the done folder location
       String doneFolder = TEST_ROOT_DIR + "history_done";

+ 1 - 1
src/test/org/apache/hadoop/mapred/TestJobHistoryParsing.java

@@ -59,7 +59,7 @@ public class TestJobHistoryParsing  extends TestCase {
     JobConf conf = new JobConf();
     conf.set("hadoop.job.history.location", historyDir.toString());
     FileSystem fs = FileSystem.getLocal(new JobConf());
-    JobHistory.init(conf, "localhost", 1234);
+    JobHistory.init(null, conf, "localhost", 1234);
     Path historyLog = new Path(historyDir, "testlog");
     PrintWriter out = new PrintWriter(fs.create(historyLog));
     historyWriter.add(out);

+ 4 - 5
src/webapps/job/analysejobhistory.jsp

@@ -4,14 +4,12 @@
   import="java.io.*"
   import="java.util.*"
   import="org.apache.hadoop.mapred.*"
+  import="org.apache.hadoop.fs.*"
   import="org.apache.hadoop.util.*"
   import="java.text.SimpleDateFormat"
   import="org.apache.hadoop.mapred.JobHistory.*"
 %>
-<jsp:include page="loadhistory.jsp">
-  <jsp:param name="jobid" value="<%=request.getParameter("jobid") %>"/>
-  <jsp:param name="logFile" value="<%=request.getParameter("logFile") %>"/>
-</jsp:include>
+
 <%!	private static SimpleDateFormat dateFormat 
                               = new SimpleDateFormat("d/MM HH:mm:ss") ; 
 %>
@@ -25,7 +23,8 @@
   if (numTasks != null) {
     showTasks = Integer.parseInt(numTasks);  
   }
-  JobInfo job = (JobInfo)request.getSession().getAttribute("job");
+  FileSystem fs = (FileSystem) application.getAttribute("fileSys");
+  JobInfo job = JSPUtil.getJobInfo(request, fs);
 %>
 <h2>Hadoop Job <a href="jobdetailshistory.jsp?jobid=<%=jobid%>&&logFile=<%=encodedLogFileName%>"><%=jobid %> </a></h2>
 <b>User : </b> <%=job.get(Keys.USER) %><br/> 

+ 1 - 1
src/webapps/job/jobconf_history.jsp

@@ -30,7 +30,7 @@
   Path logDir = new Path(request.getParameter("jobLogDir"));
   Path jobFilePath = new Path(logDir, 
                        request.getParameter("jobUniqueString") + "_conf.xml");
-  FileSystem fs = (FileSystem)request.getSession().getAttribute("fs");
+  FileSystem fs = (FileSystem) application.getAttribute("fileSys");
   FSDataInputStream jobFile = null; 
   try {
     jobFile = fs.open(jobFilePath);

+ 3 - 6
src/webapps/job/jobdetailshistory.jsp

@@ -9,10 +9,7 @@
   import="java.text.SimpleDateFormat"
   import="org.apache.hadoop.mapred.JobHistory.*"
 %>
-<jsp:include page="loadhistory.jsp">
-  <jsp:param name="jobid" value="<%=request.getParameter("jobid") %>"/>
-  <jsp:param name="logFile" value="<%=request.getParameter("logFile") %>"/>
-</jsp:include>
+
 <%! static SimpleDateFormat dateFormat = new SimpleDateFormat("d-MMM-yyyy HH:mm:ss") ; %>
 <%
     String jobid = request.getParameter("jobid");
@@ -23,8 +20,8 @@
     String[] jobDetails = jobFile.getName().split("_");
     String jobUniqueString = jobDetails[0] + "_" +jobDetails[1] + "_" + jobid ;
 	
-    JobInfo job = (JobInfo)request.getSession().getAttribute("job");
-    FileSystem fs = (FileSystem)request.getSession().getAttribute("fs");
+    FileSystem fs = (FileSystem) application.getAttribute("fileSys");
+    JobInfo job = JSPUtil.getJobInfo(request, fs);
 %>
 <html><body>
 <h2>Hadoop Job <%=jobid %> on <a href="jobhistory.jsp">History Viewer</a></h2>

+ 4 - 6
src/webapps/job/jobtaskshistory.jsp

@@ -4,14 +4,12 @@
   import="java.io.*"
   import="java.util.*"
   import="org.apache.hadoop.mapred.*"
+  import="org.apache.hadoop.fs.*"
   import="org.apache.hadoop.util.*"
   import="java.text.SimpleDateFormat"
   import="org.apache.hadoop.mapred.JobHistory.*"
 %>
-<jsp:include page="loadhistory.jsp">
-	<jsp:param name="jobid" value="<%=request.getParameter("jobid") %>"/>
-	<jsp:param name="logFile" value="<%=request.getParameter("logFile") %>"/>
-</jsp:include>
+
 <%!	
   private static SimpleDateFormat dateFormat =
                                     new SimpleDateFormat("d/MM HH:mm:ss") ; 
@@ -24,8 +22,8 @@
   String taskStatus = request.getParameter("status"); 
   String taskType = request.getParameter("taskType"); 
   
-  JobHistory.JobInfo job = (JobHistory.JobInfo)request.
-                            getSession().getAttribute("job");
+  FileSystem fs = (FileSystem) application.getAttribute("fileSys");
+  JobInfo job = JSPUtil.getJobInfo(request, fs);
   Map<String, JobHistory.Task> tasks = job.getAllTasks(); 
 %>
 <html>

+ 21 - 8
src/webapps/job/jobtracker.jsp

@@ -79,8 +79,7 @@
   <ul id="quicklinks-list">
     <li><a href="#scheduling_info">Scheduling Info</a></li>
     <li><a href="#running_jobs">Running Jobs</a></li>
-    <li><a href="#completed_jobs">Completed Jobs</a></li>
-    <li><a href="#failed_jobs">Failed Jobs</a></li>
+    <li><a href="#retired_jobs">Retired Jobs</a></li>
     <li><a href="#local_logs">Local Logs</a></li>
   </ul>
 </div>
@@ -135,13 +134,27 @@ for(JobQueueInfo queue: queues) {
 <%=JSPUtil.generateJobTable("Running", runningJobs, 30, 0)%>
 <hr>
 
-<h2 id="completed_jobs">Completed Jobs</h2>
-<%=JSPUtil.generateJobTable("Completed", completedJobs, 0, runningJobs.size())%>
-<hr>
+<%
+if (completedJobs.size() > 0) {
+  out.print("<h2 id=\"completed_jobs\">Completed Jobs</h2>");
+  out.print(JSPUtil.generateJobTable("Completed", completedJobs, 0, 
+    runningJobs.size()));
+  out.print("<hr>");
+}
+%>
+
+<%
+if (failedJobs.size() > 0) {
+  out.print("<h2 id=\"failed_jobs\">Failed Jobs</h2>");
+  out.print(JSPUtil.generateJobTable("Failed", failedJobs, 0, 
+    (runningJobs.size()+completedJobs.size())));
+  out.print("<hr>");
+}
+%>
 
-<h2 id="failed_jobs">Failed Jobs</h2>
-<%=JSPUtil.generateJobTable("Failed", failedJobs, 0, 
-    (runningJobs.size()+completedJobs.size()))%>
+<h2 id="retired_jobs">Retired Jobs</h2>
+<%=JSPUtil.generateRetiredJobTable(tracker, 
+  (runningJobs.size()+completedJobs.size()+failedJobs.size()))%>
 <hr>
 
 <h2 id="local_logs">Local Logs</h2>

+ 4 - 6
src/webapps/job/taskdetailshistory.jsp

@@ -4,14 +4,12 @@
   import="java.io.*"
   import="java.util.*"
   import="org.apache.hadoop.mapred.*"
+  import="org.apache.hadoop.fs.*"
   import="org.apache.hadoop.util.*"
   import="java.text.SimpleDateFormat"
   import="org.apache.hadoop.mapred.JobHistory.*"
 %>
-<jsp:include page="loadhistory.jsp">
-  <jsp:param name="jobid" value="<%=request.getParameter("jobid") %>"/>
-  <jsp:param name="jobTrackerId" value="<%=request.getParameter("jobTrackerId") %>"/>
-</jsp:include>
+
 <%!	private static SimpleDateFormat dateFormat = new SimpleDateFormat("d/MM HH:mm:ss") ; %>
 
 <%	
@@ -19,8 +17,8 @@
   String logFile = request.getParameter("logFile");
   String encodedLogFileName = JobHistory.JobInfo.encodeJobHistoryFilePath(logFile);
   String taskid = request.getParameter("taskid"); 
-  JobHistory.JobInfo job = (JobHistory.JobInfo)
-                              request.getSession().getAttribute("job");
+  FileSystem fs = (FileSystem) application.getAttribute("fileSys");
+  JobInfo job = JSPUtil.getJobInfo(request, fs);
   JobHistory.Task task = job.getAllTasks().get(taskid); 
   String type = task.get(Keys.TASK_TYPE);
 %>