瀏覽代碼

MAPREDUCE-4059. The history server should have a separate pluggable storage/query interface. (Robert Evans via tgraves).

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1311896 13f79535-47bb-0310-9956-ffa450edef68
Thomas Graves 13 年之前
父節點
當前提交
cbb5f61090
共有 18 個文件被更改,包括 1398 次插入965 次删除
  1. 3 0
      hadoop-mapreduce-project/CHANGES.txt
  2. 18 0
      hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/jobhistory/JHAdminConfig.java
  3. 4 0
      hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/jobhistory/JobHistoryUtils.java
  4. 217 0
      hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/CachedHistoryStorage.java
  5. 5 0
      hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/HistoryContext.java
  6. 763 0
      hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/HistoryFileManager.java
  7. 80 0
      hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/HistoryStorage.java
  8. 150 879
      hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/JobHistory.java
  9. 1 0
      hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/PartialJob.java
  10. 1 0
      hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/webapp/HsWebApp.java
  11. 33 79
      hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/webapp/HsWebServices.java
  12. 9 1
      hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/test/java/org/apache/hadoop/mapreduce/v2/hs/TestJobHistoryParsing.java
  13. 18 1
      hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/test/java/org/apache/hadoop/mapreduce/v2/hs/webapp/TestHsWebServices.java
  14. 19 1
      hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/test/java/org/apache/hadoop/mapreduce/v2/hs/webapp/TestHsWebServicesAttempts.java
  15. 19 1
      hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/test/java/org/apache/hadoop/mapreduce/v2/hs/webapp/TestHsWebServicesJobConf.java
  16. 20 1
      hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/test/java/org/apache/hadoop/mapreduce/v2/hs/webapp/TestHsWebServicesJobs.java
  17. 19 1
      hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/test/java/org/apache/hadoop/mapreduce/v2/hs/webapp/TestHsWebServicesJobsQuery.java
  18. 19 1
      hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/test/java/org/apache/hadoop/mapreduce/v2/hs/webapp/TestHsWebServicesTasks.java

+ 3 - 0
hadoop-mapreduce-project/CHANGES.txt

@@ -235,6 +235,9 @@ Release 0.23.3 - UNRELEASED
 
   IMPROVEMENTS
 
+    MAPREDUCE-4059. The history server should have a separate pluggable 
+    storage/query interface. (Robert Evans via tgraves)
+
   OPTIMIZATIONS
 
   BUG FIXES

+ 18 - 0
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/jobhistory/JHAdminConfig.java

@@ -44,6 +44,9 @@ public class JHAdminConfig {
   /** Run the History Cleaner every X ms.*/
   public static final String MR_HISTORY_CLEANER_INTERVAL_MS = 
     MR_HISTORY_PREFIX + "cleaner.interval-ms";
+  public static final long DEFAULT_MR_HISTORY_CLEANER_INTERVAL_MS = 
+    1 * 24 * 60 * 60 * 1000l; //1 day
+  
   
   /** The number of threads to handle client API requests.*/
   public static final String MR_HISTORY_CLIENT_THREAD_COUNT = 
@@ -56,7 +59,9 @@ public class JHAdminConfig {
    */
   public static final String MR_HISTORY_DATESTRING_CACHE_SIZE = 
     MR_HISTORY_PREFIX + "datestring.cache.size";
+  public static final int DEFAULT_MR_HISTORY_DATESTRING_CACHE_SIZE = 200000;
   
+  //TODO REMOVE debug-mode
   /** Equivalent to 0.20 mapreduce.jobhistory.debug.mode */
   public static final String MR_HISTORY_DEBUG_MODE = 
     MR_HISTORY_PREFIX + "debug-mode";
@@ -75,6 +80,7 @@ public class JHAdminConfig {
   /** Size of the job list cache.*/
   public static final String MR_HISTORY_JOBLIST_CACHE_SIZE =
     MR_HISTORY_PREFIX + "joblist.cache.size";
+  public static final int DEFAULT_MR_HISTORY_JOBLIST_CACHE_SIZE = 20000;
   
   /** The location of the Kerberos keytab file.*/
   public static final String MR_HISTORY_KEYTAB = MR_HISTORY_PREFIX + "keytab";
@@ -82,6 +88,7 @@ public class JHAdminConfig {
   /** Size of the loaded job cache.*/
   public static final String MR_HISTORY_LOADED_JOB_CACHE_SIZE = 
     MR_HISTORY_PREFIX + "loadedjobs.cache.size";
+  public static final int DEFAULT_MR_HISTORY_LOADED_JOB_CACHE_SIZE = 5;
   
   /**
    * The maximum age of a job history file before it is deleted from the history
@@ -89,6 +96,8 @@ public class JHAdminConfig {
    */
   public static final String MR_HISTORY_MAX_AGE_MS =
     MR_HISTORY_PREFIX + "max-age-ms";
+  public static final long DEFAULT_MR_HISTORY_MAX_AGE = 
+    7 * 24 * 60 * 60 * 1000L; //1 week
   
   /**
    * Scan for history files to more from intermediate done dir to done dir
@@ -96,10 +105,13 @@ public class JHAdminConfig {
    */
   public static final String MR_HISTORY_MOVE_INTERVAL_MS = 
     MR_HISTORY_PREFIX + "move.interval-ms";
+  public static final long DEFAULT_MR_HISTORY_MOVE_INTERVAL_MS = 
+    3 * 60 * 1000l; //3 minutes
   
   /** The number of threads used to move files.*/
   public static final String MR_HISTORY_MOVE_THREAD_COUNT = 
     MR_HISTORY_PREFIX + "move.thread-count";
+  public static final int DEFAULT_MR_HISTORY_MOVE_THREAD_COUNT = 3;
   
   /** The Kerberos principal for the history server.*/
   public static final String MR_HISTORY_PRINCIPAL = 
@@ -116,4 +128,10 @@ public class JHAdminConfig {
    */
   public static final String MR_HS_SECURITY_SERVICE_AUTHORIZATION =
       "security.mrhs.client.protocol.acl";
+  
+  /**
+   * The HistoryStorage class to use to cache history data.
+   */
+  public static final String MR_HISTORY_STORAGE =
+    MR_HISTORY_PREFIX + ".store.class";
 }

+ 4 - 0
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/jobhistory/JobHistoryUtils.java

@@ -31,6 +31,8 @@ import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.regex.Matcher;
 import java.util.regex.Pattern;
 
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileContext;
 import org.apache.hadoop.fs.FileStatus;
@@ -50,6 +52,8 @@ import org.apache.hadoop.yarn.api.records.ApplicationId;
 import com.google.common.base.Joiner;
 import com.google.common.base.Splitter;
 
+@InterfaceAudience.Private
+@InterfaceStability.Unstable
 public class JobHistoryUtils {
   
   /**

+ 217 - 0
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/CachedHistoryStorage.java

@@ -0,0 +1,217 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapreduce.v2.hs;
+
+import java.io.IOException;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.LinkedHashMap;
+import java.util.Map;
+import java.util.SortedMap;
+import java.util.TreeMap;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapreduce.v2.api.records.JobId;
+import org.apache.hadoop.mapreduce.v2.api.records.JobReport;
+import org.apache.hadoop.mapreduce.v2.api.records.JobState;
+import org.apache.hadoop.mapreduce.v2.app.job.Job;
+import org.apache.hadoop.mapreduce.v2.hs.webapp.dao.JobsInfo;
+import org.apache.hadoop.mapreduce.v2.hs.HistoryFileManager.MetaInfo;
+import org.apache.hadoop.mapreduce.v2.hs.webapp.dao.JobInfo;
+import org.apache.hadoop.mapreduce.v2.jobhistory.JHAdminConfig;
+import org.apache.hadoop.yarn.YarnException;
+import org.apache.hadoop.yarn.service.AbstractService;
+
+/**
+ * Manages an in memory cache of parsed Job History files.
+ */
+public class CachedHistoryStorage extends AbstractService implements
+    HistoryStorage {
+  private static final Log LOG = LogFactory.getLog(CachedHistoryStorage.class);
+
+  private Map<JobId, Job> loadedJobCache = null;
+  // The number of loaded jobs.
+  private int loadedJobCacheSize;
+
+  private HistoryFileManager hsManager;
+
+  @Override
+  public void setHistoryFileManager(HistoryFileManager hsManager) {
+    this.hsManager = hsManager;
+  }
+
+  @SuppressWarnings("serial")
+  @Override
+  public void init(Configuration conf) throws YarnException {
+    LOG.info("CachedHistoryStorage Init");
+
+    loadedJobCacheSize = conf.getInt(
+        JHAdminConfig.MR_HISTORY_LOADED_JOB_CACHE_SIZE,
+        JHAdminConfig.DEFAULT_MR_HISTORY_LOADED_JOB_CACHE_SIZE);
+
+    loadedJobCache = Collections.synchronizedMap(new LinkedHashMap<JobId, Job>(
+        loadedJobCacheSize + 1, 0.75f, true) {
+      @Override
+      public boolean removeEldestEntry(final Map.Entry<JobId, Job> eldest) {
+        return super.size() > loadedJobCacheSize;
+      }
+    });
+
+    super.init(conf);
+  }
+
+  public CachedHistoryStorage() {
+    super(CachedHistoryStorage.class.getName());
+  }
+
+  private Job loadJob(MetaInfo metaInfo) {
+    try {
+      Job job = hsManager.loadJob(metaInfo);
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("Adding " + job.getID() + " to loaded job cache");
+      }
+      loadedJobCache.put(job.getID(), job);
+      return job;
+    } catch (IOException e) {
+      throw new YarnException(
+          "Could not find/load job: " + metaInfo.getJobId(), e);
+    }
+  }
+
+  @Override
+  public synchronized Job getFullJob(JobId jobId) {
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("Looking for Job " + jobId);
+    }
+    try {
+      Job result = loadedJobCache.get(jobId);
+      if (result == null) {
+        MetaInfo metaInfo = hsManager.getMetaInfo(jobId);
+        if (metaInfo != null) {
+          result = loadJob(metaInfo);
+        }
+      }
+      return result;
+    } catch (IOException e) {
+      throw new YarnException(e);
+    }
+  }
+
+  @Override
+  public Map<JobId, Job> getAllPartialJobs() {
+    LOG.debug("Called getAllPartialJobs()");
+    SortedMap<JobId, Job> result = new TreeMap<JobId, Job>();
+    try {
+      for (MetaInfo mi : hsManager.getAllMetaInfo()) {
+        if (mi != null) {
+          JobId id = mi.getJobId();
+          result.put(id, new PartialJob(mi.getJobIndexInfo(), id));
+        }
+      }
+    } catch (IOException e) {
+      LOG.warn("Error trying to scan for all MetaInfos", e);
+      throw new YarnException(e);
+    }
+    return result;
+  }
+
+  @Override
+  public void jobRemovedFromHDFS(JobId jobId) {
+    loadedJobCache.remove(jobId);
+  }
+
+  @Override
+  public JobsInfo getPartialJobs(Long offset, Long count, String user,
+      String queue, Long sBegin, Long sEnd, Long fBegin, Long fEnd,
+      JobState jobState) {
+    return getPartialJobs(getAllPartialJobs().values(), offset, count, user,
+        queue, sBegin, sEnd, fBegin, fEnd, jobState);
+  }
+
+  public static JobsInfo getPartialJobs(Collection<Job> jobs, Long offset,
+      Long count, String user, String queue, Long sBegin, Long sEnd,
+      Long fBegin, Long fEnd, JobState jobState) {
+    JobsInfo allJobs = new JobsInfo();
+
+    if (sBegin == null || sBegin < 0)
+      sBegin = 0l;
+    if (sEnd == null)
+      sEnd = Long.MAX_VALUE;
+    if (fBegin == null || fBegin < 0)
+      fBegin = 0l;
+    if (fEnd == null)
+      fEnd = Long.MAX_VALUE;
+    if (offset == null || offset < 0)
+      offset = 0l;
+    if (count == null)
+      count = Long.MAX_VALUE;
+
+    if (offset > jobs.size()) {
+      return allJobs;
+    }
+
+    long at = 0;
+    long end = offset + count - 1;
+    if (end < 0) { // due to overflow
+      end = Long.MAX_VALUE;
+    }
+    for (Job job : jobs) {
+      if (at > end) {
+        break;
+      }
+
+      // can't really validate queue is a valid one since queues could change
+      if (queue != null && !queue.isEmpty()) {
+        if (!job.getQueueName().equals(queue)) {
+          continue;
+        }
+      }
+
+      if (user != null && !user.isEmpty()) {
+        if (!job.getUserName().equals(user)) {
+          continue;
+        }
+      }
+
+      JobReport report = job.getReport();
+
+      if (report.getStartTime() < sBegin || report.getStartTime() > sEnd) {
+        continue;
+      }
+      if (report.getFinishTime() < fBegin || report.getFinishTime() > fEnd) {
+        continue;
+      }
+      if (jobState != null && jobState != report.getJobState()) {
+        continue;
+      }
+
+      at++;
+      if ((at - 1) < offset) {
+        continue;
+      }
+
+      JobInfo jobInfo = new JobInfo(job);
+
+      allJobs.add(jobInfo);
+    }
+    return allJobs;
+  }
+}

+ 5 - 0
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/HistoryContext.java

@@ -24,8 +24,13 @@ import org.apache.hadoop.mapreduce.v2.app.AppContext;
 import org.apache.hadoop.mapreduce.v2.app.job.Job;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.mapreduce.v2.api.records.JobId;
+import org.apache.hadoop.mapreduce.v2.api.records.JobState;
+import org.apache.hadoop.mapreduce.v2.hs.webapp.dao.JobsInfo;
 
 public interface HistoryContext extends AppContext {
 
   Map<JobId, Job> getAllJobs(ApplicationId appID);
+
+  JobsInfo getPartialJobs(Long offset, Long count, String user,
+      String queue, Long sBegin, Long sEnd, Long fBegin, Long fEnd, JobState jobState);
 }

+ 763 - 0
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/HistoryFileManager.java

@@ -0,0 +1,763 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapreduce.v2.hs;
+
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.SortedMap;
+import java.util.TreeMap;
+import java.util.concurrent.ConcurrentSkipListMap;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FileAlreadyExistsException;
+import org.apache.hadoop.fs.FileContext;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.Options;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.PathFilter;
+import org.apache.hadoop.fs.RemoteIterator;
+import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.mapred.JobACLsManager;
+import org.apache.hadoop.mapreduce.jobhistory.JobSummary;
+import org.apache.hadoop.mapreduce.v2.api.records.JobId;
+import org.apache.hadoop.mapreduce.v2.app.job.Job;
+import org.apache.hadoop.mapreduce.v2.jobhistory.FileNameIndexUtils;
+import org.apache.hadoop.mapreduce.v2.jobhistory.JHAdminConfig;
+import org.apache.hadoop.mapreduce.v2.jobhistory.JobHistoryUtils;
+import org.apache.hadoop.mapreduce.v2.jobhistory.JobIndexInfo;
+import org.apache.hadoop.yarn.YarnException;
+import org.apache.hadoop.yarn.service.AbstractService;
+
+/**
+ * This class provides a way to interact with history files in a thread safe
+ * manor.
+ */
+@InterfaceAudience.Public
+@InterfaceStability.Unstable
+public class HistoryFileManager extends AbstractService {
+  private static final Log LOG = LogFactory.getLog(HistoryFileManager.class);
+  private static final Log SUMMARY_LOG = LogFactory.getLog(JobSummary.class);
+
+  private static String DONE_BEFORE_SERIAL_TAIL = JobHistoryUtils
+      .doneSubdirsBeforeSerialTail();
+
+  public static class MetaInfo {
+    private Path historyFile;
+    private Path confFile;
+    private Path summaryFile;
+    private JobIndexInfo jobIndexInfo;
+
+    public MetaInfo(Path historyFile, Path confFile, Path summaryFile,
+        JobIndexInfo jobIndexInfo) {
+      this.historyFile = historyFile;
+      this.confFile = confFile;
+      this.summaryFile = summaryFile;
+      this.jobIndexInfo = jobIndexInfo;
+    }
+
+    private Path getHistoryFile() {
+      return historyFile;
+    }
+
+    private Path getConfFile() {
+      return confFile;
+    }
+
+    private Path getSummaryFile() {
+      return summaryFile;
+    }
+
+    public JobIndexInfo getJobIndexInfo() {
+      return jobIndexInfo;
+    }
+
+    public JobId getJobId() {
+      return jobIndexInfo.getJobId();
+    }
+
+    private void setHistoryFile(Path historyFile) {
+      this.historyFile = historyFile;
+    }
+
+    private void setConfFile(Path confFile) {
+      this.confFile = confFile;
+    }
+
+    private void setSummaryFile(Path summaryFile) {
+      this.summaryFile = summaryFile;
+    }
+  }
+
+  /**
+   * Maps between a serial number (generated based on jobId) and the timestamp
+   * component(s) to which it belongs. Facilitates jobId based searches. If a
+   * jobId is not found in this list - it will not be found.
+   */
+  private final SortedMap<String, Set<String>> idToDateString = 
+    new TreeMap<String, Set<String>>();
+  // The number of entries in idToDateString
+  private int dateStringCacheSize;
+
+  // Maintains minimal details for recent jobs (parsed from history file name).
+  // Sorted on Job Completion Time.
+  private final SortedMap<JobId, MetaInfo> jobListCache = 
+    new ConcurrentSkipListMap<JobId, MetaInfo>();
+  // The number of jobs to maintain in the job list cache.
+  private int jobListCacheSize;
+
+  // Re-use existing MetaInfo objects if they exist for the specific JobId.
+  // (synchronization on MetaInfo)
+  // Check for existence of the object when using iterators.
+  private final SortedMap<JobId, MetaInfo> intermediateListCache = 
+    new ConcurrentSkipListMap<JobId, MetaInfo>();
+
+  // Maintains a list of known done subdirectories.
+  private final Set<Path> existingDoneSubdirs = new HashSet<Path>();
+
+  /**
+   * Maintains a mapping between intermediate user directories and the last
+   * known modification time.
+   */
+  private Map<String, Long> userDirModificationTimeMap = 
+    new HashMap<String, Long>();
+
+  private JobACLsManager aclsMgr;
+
+  private Configuration conf;
+
+  // TODO Remove me!!!!
+  private boolean debugMode;
+  private String serialNumberFormat;
+
+  private Path doneDirPrefixPath = null; // folder for completed jobs
+  private FileContext doneDirFc; // done Dir FileContext
+
+  private Path intermediateDoneDirPath = null; // Intermediate Done Dir Path
+  private FileContext intermediateDoneDirFc; // Intermediate Done Dir
+                                             // FileContext
+
+  public HistoryFileManager() {
+    super(HistoryFileManager.class.getName());
+  }
+
+  @Override
+  public void init(Configuration conf) {
+    this.conf = conf;
+
+    debugMode = conf.getBoolean(JHAdminConfig.MR_HISTORY_DEBUG_MODE, false);
+    int serialNumberLowDigits = debugMode ? 1 : 3;
+    serialNumberFormat = ("%0"
+        + (JobHistoryUtils.SERIAL_NUMBER_DIRECTORY_DIGITS + serialNumberLowDigits)
+        + "d");
+
+    String doneDirPrefix = null;
+    doneDirPrefix = JobHistoryUtils
+        .getConfiguredHistoryServerDoneDirPrefix(conf);
+    try {
+      doneDirPrefixPath = FileContext.getFileContext(conf).makeQualified(
+          new Path(doneDirPrefix));
+      doneDirFc = FileContext.getFileContext(doneDirPrefixPath.toUri(), conf);
+      mkdir(doneDirFc, doneDirPrefixPath, new FsPermission(
+          JobHistoryUtils.HISTORY_DONE_DIR_PERMISSION));
+    } catch (IOException e) {
+      throw new YarnException("Error creating done directory: ["
+          + doneDirPrefixPath + "]", e);
+    }
+
+    String intermediateDoneDirPrefix = null;
+    intermediateDoneDirPrefix = JobHistoryUtils
+        .getConfiguredHistoryIntermediateDoneDirPrefix(conf);
+    try {
+      intermediateDoneDirPath = FileContext.getFileContext(conf).makeQualified(
+          new Path(intermediateDoneDirPrefix));
+      intermediateDoneDirFc = FileContext.getFileContext(
+          intermediateDoneDirPath.toUri(), conf);
+      mkdir(intermediateDoneDirFc, intermediateDoneDirPath, new FsPermission(
+          JobHistoryUtils.HISTORY_INTERMEDIATE_DONE_DIR_PERMISSIONS.toShort()));
+    } catch (IOException e) {
+      LOG.info("error creating done directory on dfs " + e);
+      throw new YarnException("Error creating intermediate done directory: ["
+          + intermediateDoneDirPath + "]", e);
+    }
+
+    this.aclsMgr = new JobACLsManager(conf);
+
+    jobListCacheSize = conf.getInt(JHAdminConfig.MR_HISTORY_JOBLIST_CACHE_SIZE,
+        JHAdminConfig.DEFAULT_MR_HISTORY_JOBLIST_CACHE_SIZE);
+
+    dateStringCacheSize = conf.getInt(
+        JHAdminConfig.MR_HISTORY_DATESTRING_CACHE_SIZE,
+        JHAdminConfig.DEFAULT_MR_HISTORY_DATESTRING_CACHE_SIZE);
+
+    super.init(conf);
+  }
+
+  private void mkdir(FileContext fc, Path path, FsPermission fsp)
+      throws IOException {
+    if (!fc.util().exists(path)) {
+      try {
+        fc.mkdir(path, fsp, true);
+
+        FileStatus fsStatus = fc.getFileStatus(path);
+        LOG.info("Perms after creating " + fsStatus.getPermission().toShort()
+            + ", Expected: " + fsp.toShort());
+        if (fsStatus.getPermission().toShort() != fsp.toShort()) {
+          LOG.info("Explicitly setting permissions to : " + fsp.toShort()
+              + ", " + fsp);
+          fc.setPermission(path, fsp);
+        }
+      } catch (FileAlreadyExistsException e) {
+        LOG.info("Directory: [" + path + "] already exists.");
+      }
+    }
+  }
+
+  /**
+   * Populates index data structures. Should only be called at initialization
+   * times.
+   */
+  @SuppressWarnings("unchecked")
+  void initExisting() throws IOException {
+    LOG.info("Initializing Existing Jobs...");
+    List<FileStatus> timestampedDirList = findTimestampedDirectories();
+    Collections.sort(timestampedDirList);
+    for (FileStatus fs : timestampedDirList) {
+      // TODO Could verify the correct format for these directories.
+      addDirectoryToSerialNumberIndex(fs.getPath());
+      addDirectoryToJobListCache(fs.getPath());
+    }
+  }
+
+  private void removeDirectoryFromSerialNumberIndex(Path serialDirPath) {
+    String serialPart = serialDirPath.getName();
+    String timeStampPart = JobHistoryUtils
+        .getTimestampPartFromPath(serialDirPath.toString());
+    if (timeStampPart == null) {
+      LOG.warn("Could not find timestamp portion from path: "
+          + serialDirPath.toString() + ". Continuing with next");
+      return;
+    }
+    if (serialPart == null) {
+      LOG.warn("Could not find serial portion from path: "
+          + serialDirPath.toString() + ". Continuing with next");
+      return;
+    }
+    synchronized (idToDateString) {
+      // TODO make this thread safe without the synchronize
+      if (idToDateString.containsKey(serialPart)) {
+        Set<String> set = idToDateString.get(serialPart);
+        set.remove(timeStampPart);
+        if (set.isEmpty()) {
+          idToDateString.remove(serialPart);
+        }
+      }
+    }
+  }
+
+  private void addDirectoryToSerialNumberIndex(Path serialDirPath) {
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("Adding " + serialDirPath + " to serial index");
+    }
+    String serialPart = serialDirPath.getName();
+    String timestampPart = JobHistoryUtils
+        .getTimestampPartFromPath(serialDirPath.toString());
+    if (timestampPart == null) {
+      LOG.warn("Could not find timestamp portion from path: " + serialDirPath
+          + ". Continuing with next");
+      return;
+    }
+    if (serialPart == null) {
+      LOG.warn("Could not find serial portion from path: "
+          + serialDirPath.toString() + ". Continuing with next");
+    }
+    addToSerialNumberIndex(serialPart, timestampPart);
+  }
+
+  private void addToSerialNumberIndex(String serialPart, String timestampPart) {
+    synchronized (idToDateString) {
+      // TODO make this thread safe without the synchronize
+      if (!idToDateString.containsKey(serialPart)) {
+        idToDateString.put(serialPart, new HashSet<String>());
+        if (idToDateString.size() > dateStringCacheSize) {
+          idToDateString.remove(idToDateString.firstKey());
+        }
+        Set<String> datePartSet = idToDateString.get(serialPart);
+        datePartSet.add(timestampPart);
+      }
+    }
+  }
+
+  private void addDirectoryToJobListCache(Path path) throws IOException {
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("Adding " + path + " to job list cache.");
+    }
+    List<FileStatus> historyFileList = scanDirectoryForHistoryFiles(path,
+        doneDirFc);
+    for (FileStatus fs : historyFileList) {
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("Adding in history for " + fs.getPath());
+      }
+      JobIndexInfo jobIndexInfo = FileNameIndexUtils.getIndexInfo(fs.getPath()
+          .getName());
+      String confFileName = JobHistoryUtils
+          .getIntermediateConfFileName(jobIndexInfo.getJobId());
+      String summaryFileName = JobHistoryUtils
+          .getIntermediateSummaryFileName(jobIndexInfo.getJobId());
+      MetaInfo metaInfo = new MetaInfo(fs.getPath(), new Path(fs.getPath()
+          .getParent(), confFileName), new Path(fs.getPath().getParent(),
+          summaryFileName), jobIndexInfo);
+      addToJobListCache(metaInfo);
+    }
+  }
+
+  private static List<FileStatus> scanDirectory(Path path, FileContext fc,
+      PathFilter pathFilter) throws IOException {
+    path = fc.makeQualified(path);
+    List<FileStatus> jhStatusList = new ArrayList<FileStatus>();
+    RemoteIterator<FileStatus> fileStatusIter = fc.listStatus(path);
+    while (fileStatusIter.hasNext()) {
+      FileStatus fileStatus = fileStatusIter.next();
+      Path filePath = fileStatus.getPath();
+      if (fileStatus.isFile() && pathFilter.accept(filePath)) {
+        jhStatusList.add(fileStatus);
+      }
+    }
+    return jhStatusList;
+  }
+
+  private static List<FileStatus> scanDirectoryForHistoryFiles(Path path,
+      FileContext fc) throws IOException {
+    return scanDirectory(path, fc, JobHistoryUtils.getHistoryFileFilter());
+  }
+
+  /**
+   * Finds all history directories with a timestamp component by scanning the
+   * filesystem. Used when the JobHistory server is started.
+   * 
+   * @return
+   */
+  private List<FileStatus> findTimestampedDirectories() throws IOException {
+    List<FileStatus> fsList = JobHistoryUtils.localGlobber(doneDirFc,
+        doneDirPrefixPath, DONE_BEFORE_SERIAL_TAIL);
+    return fsList;
+  }
+
+  private void addToJobListCache(MetaInfo metaInfo) {
+    JobId jobId = metaInfo.getJobIndexInfo().getJobId();
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("Adding " + jobId + " to job list cache with "
+          + metaInfo.getJobIndexInfo());
+    }
+    jobListCache.put(jobId, metaInfo);
+    if (jobListCache.size() > jobListCacheSize) {
+      jobListCache.remove(jobListCache.firstKey());
+    }
+  }
+
+  /**
+   * Scans the intermediate directory to find user directories. Scans these for
+   * history files if the modification time for the directory has changed.
+   * 
+   * @throws IOException
+   */
+  private void scanIntermediateDirectory() throws IOException {
+    List<FileStatus> userDirList = JobHistoryUtils.localGlobber(
+        intermediateDoneDirFc, intermediateDoneDirPath, "");
+
+    for (FileStatus userDir : userDirList) {
+      String name = userDir.getPath().getName();
+      long newModificationTime = userDir.getModificationTime();
+      boolean shouldScan = false;
+      synchronized (userDirModificationTimeMap) {
+        if (!userDirModificationTimeMap.containsKey(name)
+            || newModificationTime > userDirModificationTimeMap.get(name)) {
+          shouldScan = true;
+          userDirModificationTimeMap.put(name, newModificationTime);
+        }
+      }
+      if (shouldScan) {
+        scanIntermediateDirectory(userDir.getPath());
+      }
+    }
+  }
+
+  /**
+   * Scans the specified path and populates the intermediate cache.
+   * 
+   * @param absPath
+   * @throws IOException
+   */
+  private void scanIntermediateDirectory(final Path absPath) throws IOException {
+    List<FileStatus> fileStatusList = scanDirectoryForHistoryFiles(absPath,
+        intermediateDoneDirFc);
+    for (FileStatus fs : fileStatusList) {
+      JobIndexInfo jobIndexInfo = FileNameIndexUtils.getIndexInfo(fs.getPath()
+          .getName());
+      String confFileName = JobHistoryUtils
+          .getIntermediateConfFileName(jobIndexInfo.getJobId());
+      String summaryFileName = JobHistoryUtils
+          .getIntermediateSummaryFileName(jobIndexInfo.getJobId());
+      MetaInfo metaInfo = new MetaInfo(fs.getPath(), new Path(fs.getPath()
+          .getParent(), confFileName), new Path(fs.getPath().getParent(),
+          summaryFileName), jobIndexInfo);
+      if (!intermediateListCache.containsKey(jobIndexInfo.getJobId())) {
+        intermediateListCache.put(jobIndexInfo.getJobId(), metaInfo);
+      }
+    }
+  }
+
+  /**
+   * Searches the job history file FileStatus list for the specified JobId.
+   * 
+   * @param fileStatusList
+   *          fileStatus list of Job History Files.
+   * @param jobId
+   *          The JobId to find.
+   * @return A MetaInfo object for the jobId, null if not found.
+   * @throws IOException
+   */
+  private MetaInfo getJobMetaInfo(List<FileStatus> fileStatusList, JobId jobId)
+      throws IOException {
+    for (FileStatus fs : fileStatusList) {
+      JobIndexInfo jobIndexInfo = FileNameIndexUtils.getIndexInfo(fs.getPath()
+          .getName());
+      if (jobIndexInfo.getJobId().equals(jobId)) {
+        String confFileName = JobHistoryUtils
+            .getIntermediateConfFileName(jobIndexInfo.getJobId());
+        String summaryFileName = JobHistoryUtils
+            .getIntermediateSummaryFileName(jobIndexInfo.getJobId());
+        MetaInfo metaInfo = new MetaInfo(fs.getPath(), new Path(fs.getPath()
+            .getParent(), confFileName), new Path(fs.getPath().getParent(),
+            summaryFileName), jobIndexInfo);
+        return metaInfo;
+      }
+    }
+    return null;
+  }
+
+  /**
+   * Scans old directories known by the idToDateString map for the specified
+   * jobId. If the number of directories is higher than the supported size of
+   * the idToDateString cache, the jobId will not be found.
+   * 
+   * @param jobId
+   *          the jobId.
+   * @return
+   * @throws IOException
+   */
+  private MetaInfo scanOldDirsForJob(JobId jobId) throws IOException {
+    int jobSerialNumber = JobHistoryUtils.jobSerialNumber(jobId);
+    String boxedSerialNumber = String.valueOf(jobSerialNumber);
+    Set<String> dateStringSet;
+    synchronized (idToDateString) {
+      Set<String> found = idToDateString.get(boxedSerialNumber);
+      if (found == null) {
+        return null;
+      } else {
+        dateStringSet = new HashSet<String>(found);
+      }
+    }
+    for (String timestampPart : dateStringSet) {
+      Path logDir = canonicalHistoryLogPath(jobId, timestampPart);
+      List<FileStatus> fileStatusList = scanDirectoryForHistoryFiles(logDir,
+          doneDirFc);
+      MetaInfo metaInfo = getJobMetaInfo(fileStatusList, jobId);
+      if (metaInfo != null) {
+        return metaInfo;
+      }
+    }
+    return null;
+  }
+
+  /**
+   * Checks for the existence of the job history file in the intermediate
+   * directory.
+   * 
+   * @param jobId
+   * @return
+   * @throws IOException
+   */
+  private MetaInfo scanIntermediateForJob(JobId jobId) throws IOException {
+    scanIntermediateDirectory();
+    return intermediateListCache.get(jobId);
+  }
+
+  /**
+   * Parse a job from the JobHistoryFile, if the underlying file is not going to
+   * be deleted.
+   * 
+   * @param metaInfo
+   *          the where the JobHistory is stored.
+   * @return the Job or null if the underlying file was deleted.
+   * @throws IOException
+   *           if there is an error trying to read the file.
+   */
+  public Job loadJob(MetaInfo metaInfo) throws IOException {
+    return new CompletedJob(conf, metaInfo.getJobIndexInfo().getJobId(),
+        metaInfo.getHistoryFile(), false, metaInfo.getJobIndexInfo().getUser(),
+        metaInfo.getConfFile(), aclsMgr);
+  }
+
+  public Collection<MetaInfo> getAllMetaInfo() throws IOException {
+    scanIntermediateDirectory();
+    ArrayList<MetaInfo> result = new ArrayList<MetaInfo>();
+    result.addAll(intermediateListCache.values());
+    result.addAll(jobListCache.values());
+    return result;
+  }
+
+  Collection<MetaInfo> getIntermediateMetaInfos() throws IOException {
+    scanIntermediateDirectory();
+    return intermediateListCache.values();
+  }
+
+  public MetaInfo getMetaInfo(JobId jobId) throws IOException {
+    // MetaInfo available in cache.
+    MetaInfo metaInfo = null;
+    if (jobListCache.containsKey(jobId)) {
+      metaInfo = jobListCache.get(jobId);
+    }
+
+    if (metaInfo != null) {
+      return metaInfo;
+    }
+
+    // MetaInfo not available. Check intermediate directory for meta info.
+    metaInfo = scanIntermediateForJob(jobId);
+    if (metaInfo != null) {
+      return metaInfo;
+    }
+
+    // Intermediate directory does not contain job. Search through older ones.
+    metaInfo = scanOldDirsForJob(jobId);
+    if (metaInfo != null) {
+      return metaInfo;
+    }
+    return null;
+  }
+
+  void moveToDone(MetaInfo metaInfo) throws IOException {
+    long completeTime = metaInfo.getJobIndexInfo().getFinishTime();
+    if (completeTime == 0)
+      completeTime = System.currentTimeMillis();
+    JobId jobId = metaInfo.getJobIndexInfo().getJobId();
+
+    List<Path> paths = new ArrayList<Path>();
+    Path historyFile = metaInfo.getHistoryFile();
+    if (historyFile == null) {
+      LOG.info("No file for job-history with " + jobId + " found in cache!");
+    } else {
+      paths.add(historyFile);
+    }
+
+    Path confFile = metaInfo.getConfFile();
+    if (confFile == null) {
+      LOG.info("No file for jobConf with " + jobId + " found in cache!");
+    } else {
+      paths.add(confFile);
+    }
+
+    // TODO Check all mi getters and setters for the conf path
+    Path summaryFile = metaInfo.getSummaryFile();
+    if (summaryFile == null) {
+      LOG.info("No summary file for job: " + jobId);
+    } else {
+      try {
+        String jobSummaryString = getJobSummary(intermediateDoneDirFc,
+            summaryFile);
+        SUMMARY_LOG.info(jobSummaryString);
+        LOG.info("Deleting JobSummary file: [" + summaryFile + "]");
+        intermediateDoneDirFc.delete(summaryFile, false);
+        metaInfo.setSummaryFile(null);
+      } catch (IOException e) {
+        LOG.warn("Failed to process summary file: [" + summaryFile + "]");
+        throw e;
+      }
+    }
+
+    Path targetDir = canonicalHistoryLogPath(jobId, completeTime);
+    addDirectoryToSerialNumberIndex(targetDir);
+    try {
+      makeDoneSubdir(targetDir);
+    } catch (IOException e) {
+      LOG.warn("Failed creating subdirectory: " + targetDir
+          + " while attempting to move files for jobId: " + jobId);
+      throw e;
+    }
+    synchronized (metaInfo) {
+      if (historyFile != null) {
+        Path toPath = doneDirFc.makeQualified(new Path(targetDir, historyFile
+            .getName()));
+        try {
+          moveToDoneNow(historyFile, toPath);
+        } catch (IOException e) {
+          LOG.warn("Failed to move file: " + historyFile + " for jobId: "
+              + jobId);
+          throw e;
+        }
+        metaInfo.setHistoryFile(toPath);
+      }
+      if (confFile != null) {
+        Path toPath = doneDirFc.makeQualified(new Path(targetDir, confFile
+            .getName()));
+        try {
+          moveToDoneNow(confFile, toPath);
+        } catch (IOException e) {
+          LOG.warn("Failed to move file: " + historyFile + " for jobId: "
+              + jobId);
+          throw e;
+        }
+        metaInfo.setConfFile(toPath);
+      }
+    }
+    addToJobListCache(metaInfo);
+    intermediateListCache.remove(jobId);
+  }
+
+  private void moveToDoneNow(final Path src, final Path target)
+      throws IOException {
+    LOG.info("Moving " + src.toString() + " to " + target.toString());
+    intermediateDoneDirFc.rename(src, target, Options.Rename.NONE);
+  }
+
+  private String getJobSummary(FileContext fc, Path path) throws IOException {
+    Path qPath = fc.makeQualified(path);
+    FSDataInputStream in = fc.open(qPath);
+    String jobSummaryString = in.readUTF();
+    in.close();
+    return jobSummaryString;
+  }
+
+  private void makeDoneSubdir(Path path) throws IOException {
+    boolean existsInExistingCache = false;
+    synchronized (existingDoneSubdirs) {
+      if (existingDoneSubdirs.contains(path))
+        existsInExistingCache = true;
+    }
+    try {
+      doneDirFc.getFileStatus(path);
+      if (!existsInExistingCache) {
+        existingDoneSubdirs.add(path);
+        if (LOG.isDebugEnabled()) {
+          LOG.debug("JobHistory.maybeMakeSubdirectory -- We believed " + path
+              + " already existed, but it didn't.");
+        }
+      }
+    } catch (FileNotFoundException fnfE) {
+      try {
+        FsPermission fsp = new FsPermission(
+            JobHistoryUtils.HISTORY_DONE_DIR_PERMISSION);
+        doneDirFc.mkdir(path, fsp, true);
+        FileStatus fsStatus = doneDirFc.getFileStatus(path);
+        LOG.info("Perms after creating " + fsStatus.getPermission().toShort()
+            + ", Expected: " + fsp.toShort());
+        if (fsStatus.getPermission().toShort() != fsp.toShort()) {
+          LOG.info("Explicitly setting permissions to : " + fsp.toShort()
+              + ", " + fsp);
+          doneDirFc.setPermission(path, fsp);
+        }
+        synchronized (existingDoneSubdirs) {
+          existingDoneSubdirs.add(path);
+        }
+      } catch (FileAlreadyExistsException faeE) { 
+        // Nothing to do.
+      }
+    }
+  }
+
+  private Path canonicalHistoryLogPath(JobId id, String timestampComponent) {
+    return new Path(doneDirPrefixPath, JobHistoryUtils.historyLogSubdirectory(
+        id, timestampComponent, serialNumberFormat));
+  }
+
+  private Path canonicalHistoryLogPath(JobId id, long millisecondTime) {
+    String timestampComponent = JobHistoryUtils.timestampDirectoryComponent(
+        millisecondTime, debugMode);
+    return new Path(doneDirPrefixPath, JobHistoryUtils.historyLogSubdirectory(
+        id, timestampComponent, serialNumberFormat));
+  }
+
+  private long getEffectiveTimestamp(long finishTime, FileStatus fileStatus) {
+    if (finishTime == 0) {
+      return fileStatus.getModificationTime();
+    }
+    return finishTime;
+  }
+
+  private void deleteJobFromDone(MetaInfo metaInfo) throws IOException {
+    jobListCache.remove(metaInfo.getJobId());
+    doneDirFc.delete(doneDirFc.makeQualified(metaInfo.getHistoryFile()), false);
+    doneDirFc.delete(doneDirFc.makeQualified(metaInfo.getConfFile()), false);
+  }
+
+  @SuppressWarnings("unchecked")
+  void clean(long cutoff, HistoryStorage storage) throws IOException {
+    // TODO this should be replaced by something that knows about the directory
+    // structure and will put less of a load on HDFS.
+    boolean halted = false;
+    // TODO Delete YYYY/MM/DD directories.
+    List<FileStatus> serialDirList = findTimestampedDirectories();
+    // Sort in ascending order. Relies on YYYY/MM/DD/Serial
+    Collections.sort(serialDirList);
+    for (FileStatus serialDir : serialDirList) {
+      List<FileStatus> historyFileList = scanDirectoryForHistoryFiles(
+          serialDir.getPath(), doneDirFc);
+      for (FileStatus historyFile : historyFileList) {
+        JobIndexInfo jobIndexInfo = FileNameIndexUtils.getIndexInfo(historyFile
+            .getPath().getName());
+        long effectiveTimestamp = getEffectiveTimestamp(
+            jobIndexInfo.getFinishTime(), historyFile);
+        if (effectiveTimestamp <= cutoff) {
+          String confFileName = JobHistoryUtils
+              .getIntermediateConfFileName(jobIndexInfo.getJobId());
+          MetaInfo metaInfo = new MetaInfo(historyFile.getPath(), new Path(
+              historyFile.getPath().getParent(), confFileName), null,
+              jobIndexInfo);
+          storage.jobRemovedFromHDFS(metaInfo.getJobId());
+          deleteJobFromDone(metaInfo);
+        } else {
+          halted = true;
+          break;
+        }
+      }
+      if (!halted) {
+        doneDirFc.delete(doneDirFc.makeQualified(serialDir.getPath()), true);
+        removeDirectoryFromSerialNumberIndex(serialDir.getPath());
+        synchronized (existingDoneSubdirs) {
+          existingDoneSubdirs.remove(serialDir.getPath());
+        }
+      } else {
+        break; // Don't scan any more directories.
+      }
+    }
+  }
+}

+ 80 - 0
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/HistoryStorage.java

@@ -0,0 +1,80 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.hadoop.mapreduce.v2.hs;
+
+import java.util.Map;
+
+import org.apache.hadoop.mapreduce.v2.api.records.JobId;
+import org.apache.hadoop.mapreduce.v2.api.records.JobState;
+import org.apache.hadoop.mapreduce.v2.app.job.Job;
+import org.apache.hadoop.mapreduce.v2.hs.webapp.dao.JobsInfo;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+
+/**
+ * Provides an API to query jobs that have finished. 
+ */
+@InterfaceAudience.Public
+@InterfaceStability.Unstable
+public interface HistoryStorage {
+  
+  /**
+   * Give the Storage a reference to a class that can be used to interact with
+   * history files.
+   * @param hsManager the class that is used to interact with history files.
+   */
+  void setHistoryFileManager(HistoryFileManager hsManager);
+  
+  /**
+   * Look for a set of partial jobs.
+   * @param offset the offset into the list of jobs.
+   * @param count the maximum number of jobs to return.
+   * @param user only return jobs for the given user.
+   * @param queue only return jobs for in the given queue.
+   * @param sBegin only return Jobs that started on or after the given time.
+   * @param sEnd only return Jobs that started on or before the given time.
+   * @param fBegin only return Jobs that ended on or after the given time.
+   * @param fEnd only return Jobs that ended on or before the given time.
+   * @param jobState only return Jobs that are in the given job state.
+   * @return The list of filtered jobs.
+   */
+  JobsInfo getPartialJobs(Long offset, Long count, String user, 
+      String queue, Long sBegin, Long sEnd, Long fBegin, Long fEnd, 
+      JobState jobState);
+  
+  /**
+   * Get all of the cached jobs.  This only returns partial jobs and is here for
+   * legacy reasons.
+   * @return all of the cached jobs
+   */
+  Map<JobId, Job> getAllPartialJobs();
+  
+  /**
+   * Get a fully parsed job.
+   * @param jobId the id of the job
+   * @return the job, or null if it is not found.
+   */
+  Job getFullJob(JobId jobId);
+
+  /**
+   * Informs the Storage that a job has been removed from HDFS
+   * @param jobId the ID of the job that was removed.
+   */
+  void jobRemovedFromHDFS(JobId jobId);
+}

+ 150 - 879
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/JobHistory.java

@@ -1,36 +1,26 @@
 /**
-* Licensed to the Apache Software Foundation (ASF) under one
-* or more contributor license agreements.  See the NOTICE file
-* distributed with this work for additional information
-* regarding copyright ownership.  The ASF licenses this file
-* to you under the Apache License, Version 2.0 (the
-* "License"); you may not use this file except in compliance
-* with the License.  You may obtain a copy of the License at
-*
-*     http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing, software
-* distributed under the License is distributed on an "AS IS" BASIS,
-* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-* See the License for the specific language governing permissions and
-* limitations under the License.
-*/
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
 
 package org.apache.hadoop.mapreduce.v2.hs;
 
-import java.io.FileNotFoundException;
 import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Collections;
 import java.util.HashMap;
-import java.util.HashSet;
-import java.util.LinkedHashMap;
-import java.util.List;
 import java.util.Map;
-import java.util.Set;
-import java.util.SortedMap;
-import java.util.TreeMap;
-import java.util.concurrent.ConcurrentSkipListMap;
 import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.ScheduledThreadPoolExecutor;
 import java.util.concurrent.ThreadFactory;
@@ -41,26 +31,16 @@ import java.util.regex.Pattern;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FSDataInputStream;
-import org.apache.hadoop.fs.FileAlreadyExistsException;
-import org.apache.hadoop.fs.FileContext;
-import org.apache.hadoop.fs.FileStatus;
-import org.apache.hadoop.fs.Options;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.fs.PathFilter;
-import org.apache.hadoop.fs.RemoteIterator;
-import org.apache.hadoop.fs.permission.FsPermission;
-import org.apache.hadoop.mapred.JobACLsManager;
 import org.apache.hadoop.mapreduce.JobID;
 import org.apache.hadoop.mapreduce.MRJobConfig;
 import org.apache.hadoop.mapreduce.TypeConverter;
-import org.apache.hadoop.mapreduce.jobhistory.JobSummary;
 import org.apache.hadoop.mapreduce.v2.api.records.JobId;
+import org.apache.hadoop.mapreduce.v2.api.records.JobState;
 import org.apache.hadoop.mapreduce.v2.app.job.Job;
-import org.apache.hadoop.mapreduce.v2.jobhistory.FileNameIndexUtils;
+import org.apache.hadoop.mapreduce.v2.hs.HistoryFileManager.MetaInfo;
+import org.apache.hadoop.mapreduce.v2.hs.webapp.dao.JobsInfo;
 import org.apache.hadoop.mapreduce.v2.jobhistory.JHAdminConfig;
-import org.apache.hadoop.mapreduce.v2.jobhistory.JobHistoryUtils;
-import org.apache.hadoop.mapreduce.v2.jobhistory.JobIndexInfo;
+import org.apache.hadoop.util.ReflectionUtils;
 import org.apache.hadoop.yarn.Clock;
 import org.apache.hadoop.yarn.ClusterInfo;
 import org.apache.hadoop.yarn.YarnException;
@@ -69,106 +49,36 @@ import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.event.EventHandler;
 import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
 import org.apache.hadoop.yarn.service.AbstractService;
+import org.apache.hadoop.yarn.service.Service;
 
 import com.google.common.util.concurrent.ThreadFactoryBuilder;
 
-
-/*
+/**
  * Loads and manages the Job history cache.
  */
-public class JobHistory extends AbstractService implements HistoryContext   {
-
-  private static final int DEFAULT_JOBLIST_CACHE_SIZE = 20000;
-  private static final int DEFAULT_LOADEDJOB_CACHE_SIZE = 5;
-  private static final int DEFAULT_DATESTRING_CACHE_SIZE = 200000;
-  private static final long DEFAULT_MOVE_THREAD_INTERVAL = 3 * 60 * 1000l; //3 minutes
-  private static final int DEFAULT_MOVE_THREAD_COUNT = 3;
-  
-  static final long DEFAULT_HISTORY_MAX_AGE = 7 * 24 * 60 * 60 * 1000L; //1 week
-  static final long DEFAULT_RUN_INTERVAL = 1 * 24 * 60 * 60 * 1000l; //1 day
-  
+public class JobHistory extends AbstractService implements HistoryContext {
   private static final Log LOG = LogFactory.getLog(JobHistory.class);
 
-  private static final Log SUMMARY_LOG = LogFactory.getLog(JobSummary.class);
-  public static final Pattern CONF_FILENAME_REGEX =
-    Pattern.compile("(" + JobID.JOBID_REGEX + ")_conf.xml(?:\\.[0-9]+\\.old)?");
+  public static final Pattern CONF_FILENAME_REGEX = Pattern.compile("("
+      + JobID.JOBID_REGEX + ")_conf.xml(?:\\.[0-9]+\\.old)?");
   public static final String OLD_SUFFIX = ".old";
 
-  private static String DONE_BEFORE_SERIAL_TAIL = 
-    JobHistoryUtils.doneSubdirsBeforeSerialTail();
-  
-  /**
-   * Maps between a serial number (generated based on jobId) and the timestamp
-   * component(s) to which it belongs.
-   * Facilitates jobId based searches.
-   * If a jobId is not found in this list - it will not be found.
-   */
-  private final SortedMap<String, Set<String>> idToDateString = 
-    new ConcurrentSkipListMap<String, Set<String>>();
-
-  //Maintains minimal details for recent jobs (parsed from history file name).
-  //Sorted on Job Completion Time.
-  private final SortedMap<JobId, MetaInfo> jobListCache = 
-    new ConcurrentSkipListMap<JobId, MetaInfo>();
-  
-  
-  // Re-use exisiting MetaInfo objects if they exist for the specific JobId. (synchronization on MetaInfo)
-  // Check for existance of the object when using iterators.
-  private final SortedMap<JobId, MetaInfo> intermediateListCache = 
-    new ConcurrentSkipListMap<JobId, JobHistory.MetaInfo>();
-  
-  //Maintains a list of known done subdirectories. Not currently used.
-  private final Set<Path> existingDoneSubdirs = new HashSet<Path>();
-
-  private Map<JobId, Job> loadedJobCache = null;
-
-  /**
-   * Maintains a mapping between intermediate user directories and the last 
-   * known modification time.
-   */
-  private Map<String, Long> userDirModificationTimeMap = 
-    new HashMap<String, Long>();
-  
-  //The number of jobs to maintain in the job list cache.
-  private int jobListCacheSize;
-  
-  private JobACLsManager aclsMgr;
-  
-  //The number of loaded jobs.
-  private int loadedJobCacheSize;
-  
-  //The number of entries in idToDateString
-  private int dateStringCacheSize;
-
-  //Time interval for the move thread.
+  // Time interval for the move thread.
   private long moveThreadInterval;
-  
-  //Number of move threads.
-  private int numMoveThreads;
-  
-  private Configuration conf;
 
-  private boolean debugMode;
-  private int serialNumberLowDigits;
-  private String serialNumberFormat;
-  
+  // Number of move threads.
+  private int numMoveThreads;
 
-  private Path doneDirPrefixPath = null; // folder for completed jobs
-  private FileContext doneDirFc; // done Dir FileContext
-  
-  private Path intermediateDoneDirPath = null; //Intermediate Done Dir Path
-  private FileContext intermediateDoneDirFc; //Intermediate Done Dir FileContext
+  private Configuration conf;
 
   private Thread moveIntermediateToDoneThread = null;
   private MoveIntermediateToDoneRunnable moveIntermediateToDoneRunnable = null;
+
   private ScheduledThreadPoolExecutor cleanerScheduledExecutor = null;
-  
-  /**
-   * Writes out files to the path
-   * .....${DONE_DIR}/VERSION_STRING/YYYY/MM/DD/HH/SERIAL_NUM/jh{index_entries}.jhist
-   */
 
-  @SuppressWarnings("serial")
+  private HistoryStorage storage = null;
+  private HistoryFileManager hsManager = null;
+
   @Override
   public void init(Configuration conf) throws YarnException {
     LOG.info("JobHistory Init");
@@ -176,121 +86,66 @@ public class JobHistory extends AbstractService implements HistoryContext   {
     this.appID = RecordFactoryProvider.getRecordFactory(conf)
         .newRecordInstance(ApplicationId.class);
     this.appAttemptID = RecordFactoryProvider.getRecordFactory(conf)
-    .newRecordInstance(ApplicationAttemptId.class);
-
-    debugMode = conf.getBoolean(JHAdminConfig.MR_HISTORY_DEBUG_MODE, false);
-    serialNumberLowDigits = debugMode ? 1 : 3;
-    serialNumberFormat = ("%0"
-        + (JobHistoryUtils.SERIAL_NUMBER_DIRECTORY_DIGITS 
-            + serialNumberLowDigits) + "d");
+        .newRecordInstance(ApplicationAttemptId.class);
 
-    String doneDirPrefix = null;
-    doneDirPrefix = JobHistoryUtils.getConfiguredHistoryServerDoneDirPrefix(conf);
-    try {
-      doneDirPrefixPath = FileContext.getFileContext(conf).makeQualified(
-          new Path(doneDirPrefix));
-      doneDirFc = FileContext.getFileContext(doneDirPrefixPath.toUri(), conf);
-      mkdir(doneDirFc, doneDirPrefixPath, new FsPermission(
-          JobHistoryUtils.HISTORY_DONE_DIR_PERMISSION));
-    } catch (IOException e) {
-      throw new YarnException("Error creating done directory: [" +
-          doneDirPrefixPath + "]", e);
-    }
-
-    String intermediateDoneDirPrefix = null;
-    intermediateDoneDirPrefix = JobHistoryUtils
-        .getConfiguredHistoryIntermediateDoneDirPrefix(conf);
-    try {
-      intermediateDoneDirPath = FileContext.getFileContext(conf)
-          .makeQualified(new Path(intermediateDoneDirPrefix));
-      intermediateDoneDirFc = FileContext.getFileContext(
-          intermediateDoneDirPath.toUri(), conf);
-      mkdir(intermediateDoneDirFc, intermediateDoneDirPath, new FsPermission(
-          JobHistoryUtils.HISTORY_INTERMEDIATE_DONE_DIR_PERMISSIONS.toShort()));
-    } catch (IOException e) {
-      LOG.info("error creating done directory on dfs " + e);
-      throw new YarnException("Error creating intermediate done directory: [" 
-          + intermediateDoneDirPath + "]", e);
-    }
-    
-    this.aclsMgr = new JobACLsManager(conf);
-    
-    jobListCacheSize = conf.getInt(JHAdminConfig.MR_HISTORY_JOBLIST_CACHE_SIZE,
-        DEFAULT_JOBLIST_CACHE_SIZE);
-    loadedJobCacheSize = conf.getInt(JHAdminConfig.MR_HISTORY_LOADED_JOB_CACHE_SIZE,
-        DEFAULT_LOADEDJOB_CACHE_SIZE);
-    dateStringCacheSize = conf.getInt(JHAdminConfig.MR_HISTORY_DATESTRING_CACHE_SIZE,
-        DEFAULT_DATESTRING_CACHE_SIZE);
-    moveThreadInterval =
-        conf.getLong(JHAdminConfig.MR_HISTORY_MOVE_INTERVAL_MS,
-            DEFAULT_MOVE_THREAD_INTERVAL);
+    moveThreadInterval = conf.getLong(
+        JHAdminConfig.MR_HISTORY_MOVE_INTERVAL_MS,
+        JHAdminConfig.DEFAULT_MR_HISTORY_MOVE_INTERVAL_MS);
     numMoveThreads = conf.getInt(JHAdminConfig.MR_HISTORY_MOVE_THREAD_COUNT,
-        DEFAULT_MOVE_THREAD_COUNT);
-    
-    loadedJobCache =
-        Collections.synchronizedMap(new LinkedHashMap<JobId, Job>(
-            loadedJobCacheSize + 1, 0.75f, true) {
-          @Override
-          public boolean removeEldestEntry(final Map.Entry<JobId, Job> eldest) {
-            return super.size() > loadedJobCacheSize;
-          }
-        });
-    
+        JHAdminConfig.DEFAULT_MR_HISTORY_MOVE_THREAD_COUNT);
+
+    hsManager = new HistoryFileManager();
+    hsManager.init(conf);
     try {
-      initExisting();
+      hsManager.initExisting();
     } catch (IOException e) {
       throw new YarnException("Failed to intialize existing directories", e);
     }
-    super.init(conf);
-  }
-  
-  private void mkdir(FileContext fc, Path path, FsPermission fsp)
-      throws IOException {
-    if (!fc.util().exists(path)) {
-      try {
-        fc.mkdir(path, fsp, true);
 
-        FileStatus fsStatus = fc.getFileStatus(path);
-        LOG.info("Perms after creating " + fsStatus.getPermission().toShort()
-            + ", Expected: " + fsp.toShort());
-        if (fsStatus.getPermission().toShort() != fsp.toShort()) {
-          LOG.info("Explicitly setting permissions to : " + fsp.toShort()
-              + ", " + fsp);
-          fc.setPermission(path, fsp);
-        }
-      } catch (FileAlreadyExistsException e) {
-        LOG.info("Directory: [" + path + "] already exists.");
-      }
+    storage = ReflectionUtils.newInstance(conf.getClass(
+        JHAdminConfig.MR_HISTORY_STORAGE, CachedHistoryStorage.class,
+        HistoryStorage.class), conf);
+    if (storage instanceof Service) {
+      ((Service) storage).init(conf);
     }
+    storage.setHistoryFileManager(hsManager);
+
+    super.init(conf);
   }
 
   @Override
   public void start() {
-    //Start moveIntermediatToDoneThread
-    moveIntermediateToDoneRunnable = 
-      new MoveIntermediateToDoneRunnable(moveThreadInterval, numMoveThreads);
+    hsManager.start();
+    if (storage instanceof Service) {
+      ((Service) storage).start();
+    }
+
+    // Start moveIntermediatToDoneThread
+    moveIntermediateToDoneRunnable = new MoveIntermediateToDoneRunnable(
+        moveThreadInterval, numMoveThreads);
     moveIntermediateToDoneThread = new Thread(moveIntermediateToDoneRunnable);
     moveIntermediateToDoneThread.setName("MoveIntermediateToDoneScanner");
     moveIntermediateToDoneThread.start();
-    
-    //Start historyCleaner
+
+    // Start historyCleaner
     boolean startCleanerService = conf.getBoolean(
         JHAdminConfig.MR_HISTORY_CLEANER_ENABLE, true);
     if (startCleanerService) {
       long maxAgeOfHistoryFiles = conf.getLong(
-          JHAdminConfig.MR_HISTORY_MAX_AGE_MS, DEFAULT_HISTORY_MAX_AGE);
+          JHAdminConfig.MR_HISTORY_MAX_AGE_MS,
+          JHAdminConfig.DEFAULT_MR_HISTORY_MAX_AGE);
       cleanerScheduledExecutor = new ScheduledThreadPoolExecutor(1,
-          new ThreadFactoryBuilder().setNameFormat("LogCleaner").build()
-      );
+          new ThreadFactoryBuilder().setNameFormat("LogCleaner").build());
       long runInterval = conf.getLong(
-          JHAdminConfig.MR_HISTORY_CLEANER_INTERVAL_MS, DEFAULT_RUN_INTERVAL);
+          JHAdminConfig.MR_HISTORY_CLEANER_INTERVAL_MS,
+          JHAdminConfig.DEFAULT_MR_HISTORY_CLEANER_INTERVAL_MS);
       cleanerScheduledExecutor
           .scheduleAtFixedRate(new HistoryCleaner(maxAgeOfHistoryFiles),
               30 * 1000l, runInterval, TimeUnit.MILLISECONDS);
     }
     super.start();
   }
-  
+
   @Override
   public void stop() {
     LOG.info("Stopping JobHistory");
@@ -323,281 +178,16 @@ public class JobHistory extends AbstractService implements HistoryContext   {
         LOG.warn("HistoryCleanerService shutdown may not have succeeded");
       }
     }
+    if (storage instanceof Service) {
+      ((Service) storage).stop();
+    }
+    hsManager.stop();
     super.stop();
   }
-  
+
   public JobHistory() {
     super(JobHistory.class.getName());
   }
-  
-  /**
-   * Populates index data structures.
-   * Should only be called at initialization times.
-   */
-  @SuppressWarnings("unchecked")
-  private void initExisting() throws IOException {
-    LOG.info("Initializing Existing Jobs...");
-    List<FileStatus> timestampedDirList = findTimestampedDirectories();
-    Collections.sort(timestampedDirList);
-    for (FileStatus fs : timestampedDirList) {
-      //TODO Could verify the correct format for these directories.
-      addDirectoryToSerialNumberIndex(fs.getPath());
-      addDirectoryToJobListCache(fs.getPath());
-    }
-  }
-  
-  private void removeDirectoryFromSerialNumberIndex(Path serialDirPath) {
-    String serialPart = serialDirPath.getName();
-    String timeStampPart = 
-      JobHistoryUtils.getTimestampPartFromPath(serialDirPath.toString());
-    if (timeStampPart == null) {
-      LOG.warn("Could not find timestamp portion from path: " + 
-          serialDirPath.toString() +". Continuing with next");
-      return;
-    }
-    if (serialPart == null) {
-      LOG.warn("Could not find serial portion from path: " + 
-          serialDirPath.toString() + ". Continuing with next");
-      return;
-    }
-    if (idToDateString.containsKey(serialPart)) {
-      Set<String> set = idToDateString.get(serialPart);
-      set.remove(timeStampPart);
-      if (set.isEmpty()) {
-        idToDateString.remove(serialPart);
-      }
-    }
-
-  }
-  
-  private void addDirectoryToSerialNumberIndex(Path serialDirPath) {
-    if(LOG.isDebugEnabled()) {
-      LOG.debug("Adding "+serialDirPath+" to serial index");
-    }
-    String serialPart = serialDirPath.getName();
-    String timestampPart = 
-      JobHistoryUtils.getTimestampPartFromPath(serialDirPath.toString());
-    if (timestampPart == null) {
-      LOG.warn("Could not find timestamp portion from path: " + 
-          serialDirPath.toString() +". Continuing with next");
-      return;
-    }
-    if (serialPart == null) {
-      LOG.warn("Could not find serial portion from path: " + 
-          serialDirPath.toString() + ". Continuing with next");
-    }
-    addToSerialNumberIndex(serialPart, timestampPart);
-  }
-
-  private void addToSerialNumberIndex(String serialPart, String timestampPart) {
-      if (!idToDateString.containsKey(serialPart)) {
-        idToDateString.put(serialPart, new HashSet<String>());
-        if (idToDateString.size() > dateStringCacheSize) {
-          idToDateString.remove(idToDateString.firstKey());
-        }
-      Set<String> datePartSet = idToDateString.get(serialPart);
-      datePartSet.add(timestampPart);
-    }
-  }
-  
-  private void addDirectoryToJobListCache(Path path) throws IOException {
-    if(LOG.isDebugEnabled()) {
-      LOG.debug("Adding "+path+" to job list cache.");
-    }
-    List<FileStatus> historyFileList = scanDirectoryForHistoryFiles(path,
-        doneDirFc);
-    for (FileStatus fs : historyFileList) {
-      if(LOG.isDebugEnabled()) {
-        LOG.debug("Adding in history for "+fs.getPath());
-      }
-      JobIndexInfo jobIndexInfo = FileNameIndexUtils.getIndexInfo(fs.getPath()
-          .getName());
-      String confFileName = JobHistoryUtils
-          .getIntermediateConfFileName(jobIndexInfo.getJobId());
-      String summaryFileName = JobHistoryUtils
-          .getIntermediateSummaryFileName(jobIndexInfo.getJobId());
-      MetaInfo metaInfo = new MetaInfo(fs.getPath(), new Path(fs.getPath()
-          .getParent(), confFileName), new Path(fs.getPath().getParent(),
-          summaryFileName), jobIndexInfo);
-      addToJobListCache(jobIndexInfo.getJobId(), metaInfo);
-    }
-  }
-  
-  private static List<FileStatus> scanDirectory(Path path, FileContext fc,
-      PathFilter pathFilter) throws IOException {
-    path = fc.makeQualified(path);
-    List<FileStatus> jhStatusList = new ArrayList<FileStatus>();
-      RemoteIterator<FileStatus> fileStatusIter = fc.listStatus(path);
-      while (fileStatusIter.hasNext()) {
-        FileStatus fileStatus = fileStatusIter.next();
-        Path filePath = fileStatus.getPath();
-        if (fileStatus.isFile() && pathFilter.accept(filePath)) {
-          jhStatusList.add(fileStatus);
-        }
-      }    
-    return jhStatusList;
-  }
-  
-  private static List<FileStatus> scanDirectoryForHistoryFiles(Path path,
-      FileContext fc) throws IOException {
-    return scanDirectory(path, fc, JobHistoryUtils.getHistoryFileFilter());
-  }
-  
-  /**
-   * Finds all history directories with a timestamp component by scanning 
-   * the filesystem.
-   * Used when the JobHistory server is started.
-   * @return
-   */
-  private List<FileStatus> findTimestampedDirectories() throws IOException {
-    List<FileStatus> fsList = JobHistoryUtils.localGlobber(doneDirFc, 
-        doneDirPrefixPath, DONE_BEFORE_SERIAL_TAIL);
-    return fsList;
-  }
-    
-  /**
-   * Adds an entry to the job list cache. Maintains the size.
-   */
-  private void addToJobListCache(JobId jobId, MetaInfo metaInfo) {
-    if(LOG.isDebugEnabled()) {
-      LOG.debug("Adding "+jobId+" to job list cache with "
-          +metaInfo.getJobIndexInfo());
-    }
-    jobListCache.put(jobId, metaInfo);
-    if (jobListCache.size() > jobListCacheSize) {
-      jobListCache.remove(jobListCache.firstKey());
-    }
-  }
-
-  /**
-   * Adds an entry to the loaded job cache. Maintains the size.
-   */
-  private void addToLoadedJobCache(Job job) {
-    if(LOG.isDebugEnabled()) {
-      LOG.debug("Adding "+job.getID()+" to loaded job cache");
-    }
-    loadedJobCache.put(job.getID(), job);
-  }
-  
-  
-  /**
-   * Scans the intermediate directory to find user directories. Scans these
-   * for history files if the modification time for the directory has changed.
-   * @throws IOException
-   */
-  private void scanIntermediateDirectory() throws IOException {
-    List<FileStatus> userDirList = 
-      JobHistoryUtils.localGlobber(intermediateDoneDirFc, intermediateDoneDirPath, "");
-    
-    for (FileStatus userDir : userDirList) {
-      String name = userDir.getPath().getName();
-      long newModificationTime = userDir.getModificationTime();
-      boolean shouldScan = false;
-      synchronized (userDirModificationTimeMap) {
-        if (!userDirModificationTimeMap.containsKey(name) || newModificationTime
-            > userDirModificationTimeMap.get(name)) {
-            shouldScan = true;
-            userDirModificationTimeMap.put(name, newModificationTime);
-        }  
-        }  
-      if (shouldScan) {
-        scanIntermediateDirectory(userDir.getPath());
-      }
-    }
-  }
-
-  /**
-   * Scans the specified path and populates the intermediate cache.
-   * @param absPath
-   * @throws IOException
-   */
-  private void scanIntermediateDirectory(final Path absPath)
-      throws IOException {
-    List<FileStatus> fileStatusList = scanDirectoryForHistoryFiles(absPath,
-        intermediateDoneDirFc);
-    for (FileStatus fs : fileStatusList) {
-      JobIndexInfo jobIndexInfo = FileNameIndexUtils.getIndexInfo(fs.getPath()
-          .getName());
-      String confFileName = JobHistoryUtils
-          .getIntermediateConfFileName(jobIndexInfo.getJobId());
-      String summaryFileName = JobHistoryUtils
-          .getIntermediateSummaryFileName(jobIndexInfo.getJobId());
-      MetaInfo metaInfo = new MetaInfo(fs.getPath(), new Path(fs.getPath()
-          .getParent(), confFileName), new Path(fs.getPath().getParent(),
-          summaryFileName), jobIndexInfo);
-      if (!intermediateListCache.containsKey(jobIndexInfo.getJobId())) {
-        intermediateListCache.put(jobIndexInfo.getJobId(), metaInfo);
-      }
-    }
-  }
-  
-  /**
-   * Searches the job history file FileStatus list for the specified JobId.
-   * 
-   * @param fileStatusList fileStatus list of Job History Files.
-   * @param jobId The JobId to find.
-   * @param checkForDoneFile whether to check for the existance of a done file.
-   * @return A MetaInfo object for the jobId, null if not found.
-   * @throws IOException
-   */
-  private MetaInfo getJobMetaInfo(List<FileStatus> fileStatusList, JobId jobId) 
-  throws IOException {
-    for (FileStatus fs : fileStatusList) {
-      JobIndexInfo jobIndexInfo = 
-        FileNameIndexUtils.getIndexInfo(fs.getPath().getName());
-      if (jobIndexInfo.getJobId().equals(jobId)) {
-        String confFileName = JobHistoryUtils
-            .getIntermediateConfFileName(jobIndexInfo.getJobId());
-        String summaryFileName = JobHistoryUtils
-            .getIntermediateSummaryFileName(jobIndexInfo.getJobId());
-        MetaInfo metaInfo = new MetaInfo(fs.getPath(), new Path(fs.getPath()
-            .getParent(), confFileName), new Path(fs.getPath().getParent(),
-            summaryFileName), jobIndexInfo);
-        return metaInfo;
-      }
-    }
-    return null;
-  }
-  
-  /**
-   * Scans old directories known by the idToDateString map for the specified 
-   * jobId.
-   * If the number of directories is higher than the supported size of the
-   * idToDateString cache, the jobId will not be found.
-   * @param jobId the jobId.
-   * @return
-   * @throws IOException
-   */
-  private MetaInfo scanOldDirsForJob(JobId jobId) throws IOException {
-    int jobSerialNumber = JobHistoryUtils.jobSerialNumber(jobId);
-    String boxedSerialNumber = String.valueOf(jobSerialNumber);
-    Set<String> dateStringSet = idToDateString.get(boxedSerialNumber);
-    if (dateStringSet == null) {
-      return null;
-    }
-    for (String timestampPart : dateStringSet) {
-      Path logDir = canonicalHistoryLogPath(jobId, timestampPart);
-      List<FileStatus> fileStatusList = scanDirectoryForHistoryFiles(logDir,
-          doneDirFc);
-      MetaInfo metaInfo = getJobMetaInfo(fileStatusList, jobId);
-      if (metaInfo != null) {
-        return metaInfo;
-      }
-    }
-    return null;
-  }
-  
-  /**
-   * Checks for the existence of the job history file in the intermediate 
-   * directory.
-   * @param jobId
-   * @return
-   * @throws IOException
-   */
-  private MetaInfo scanIntermediateForJob(JobId jobId) throws IOException {
-    scanIntermediateDirectory();
-    return intermediateListCache.get(jobId);
-  }
 
   @Override
   public String getApplicationName() {
@@ -609,486 +199,167 @@ public class JobHistory extends AbstractService implements HistoryContext   {
     private long sleepTime;
     private ThreadPoolExecutor moveToDoneExecutor = null;
     private boolean running = false;
-    
-    public void stop() {
+
+    public synchronized void stop() {
       running = false;
+      notify();
     }
-    
+
     MoveIntermediateToDoneRunnable(long sleepTime, int numMoveThreads) {
       this.sleepTime = sleepTime;
-      ThreadFactory tf = new ThreadFactoryBuilder()
-        .setNameFormat("MoveIntermediateToDone Thread #%d")
-        .build();
-      moveToDoneExecutor = new ThreadPoolExecutor(1, numMoveThreads, 1, 
+      ThreadFactory tf = new ThreadFactoryBuilder().setNameFormat(
+          "MoveIntermediateToDone Thread #%d").build();
+      moveToDoneExecutor = new ThreadPoolExecutor(1, numMoveThreads, 1,
           TimeUnit.HOURS, new LinkedBlockingQueue<Runnable>(), tf);
       running = true;
     }
-  
-  @Override
+
+    @Override
     public void run() {
       Thread.currentThread().setName("IntermediateHistoryScanner");
       try {
-        while (running) {
+        while (true) {
           LOG.info("Starting scan to move intermediate done files");
-          scanIntermediateDirectory();
-          for (final MetaInfo metaInfo : intermediateListCache.values()) {
+          for (final MetaInfo metaInfo : hsManager.getIntermediateMetaInfos()) {
             moveToDoneExecutor.execute(new Runnable() {
               @Override
               public void run() {
                 try {
-                moveToDone(metaInfo);
+                  hsManager.moveToDone(metaInfo);
                 } catch (IOException e) {
-                  LOG.info("Failed to process metaInfo for job: " + 
-                      metaInfo.jobIndexInfo.getJobId(), e);
+                  LOG.info(
+                      "Failed to process metaInfo for job: "
+                          + metaInfo.getJobId(), e);
                 }
               }
             });
-
           }
-          synchronized (this) { // TODO Is this really required.
+          synchronized (this) {
             try {
               this.wait(sleepTime);
             } catch (InterruptedException e) {
               LOG.info("IntermediateHistoryScannerThread interrupted");
             }
+            if (!running) {
+              break;
+            }
           }
         }
       } catch (IOException e) {
-        LOG.warn("Unable to get a list of intermediate files to be moved from: "
-            + intermediateDoneDirPath);
+        LOG.warn("Unable to get a list of intermediate files to be moved");
+        // TODO Shut down the entire process!!!!
       }
     }
   }
-  
-  private Job loadJob(MetaInfo metaInfo) {
-    synchronized(metaInfo) {
-      try {
-        Job job = new CompletedJob(conf, metaInfo.getJobIndexInfo().getJobId(), 
-            metaInfo.getHistoryFile(), false, metaInfo.getJobIndexInfo().getUser(),
-            metaInfo.getConfFile(), this.aclsMgr);
-        addToLoadedJobCache(job);
-        return job;
-      } catch (IOException e) {
-        throw new YarnException("Could not find/load job: " + 
-            metaInfo.getJobIndexInfo().getJobId(), e);
-      }
-    }
-  }
-  
-  private Map<JobId, Job> getAllJobsInternal() {
-    //TODO This should ideally be using getAllJobsMetaInfo
-    // or get rid of that method once Job has APIs for user, finishTime etc.
-    SortedMap<JobId, Job> result = new TreeMap<JobId, Job>();
-    try {
-      scanIntermediateDirectory();
-    } catch (IOException e) {
-      LOG.warn("Failed to scan intermediate directory", e);
-      throw new YarnException(e);
-    }
-    for (JobId jobId : intermediateListCache.keySet()) {
-      MetaInfo mi = intermediateListCache.get(jobId);
-      if (mi != null) {
-        result.put(jobId, new PartialJob(mi.getJobIndexInfo(), mi
-            .getJobIndexInfo().getJobId()));
-      }
-    }
-    for (JobId jobId : jobListCache.keySet()) {
-      MetaInfo mi = jobListCache.get(jobId);
-      if (mi != null) {
-        result.put(jobId, new PartialJob(mi.getJobIndexInfo(), mi
-            .getJobIndexInfo().getJobId()));
-      }
-    }
-    return result;
-  }
 
   /**
    * Helper method for test cases.
    */
   MetaInfo getJobMetaInfo(JobId jobId) throws IOException {
-    //MetaInfo available in cache.
-    MetaInfo metaInfo = null;
-    if (jobListCache.containsKey(jobId)) {
-      metaInfo = jobListCache.get(jobId);
-    }
-
-    if (metaInfo != null) {
-      return metaInfo;
-    }
-    
-    //MetaInfo not available. Check intermediate directory for meta info.
-    metaInfo = scanIntermediateForJob(jobId);
-    if (metaInfo != null) {
-      return metaInfo;
-    }
-    
-    //Intermediate directory does not contain job. Search through older ones.
-    metaInfo = scanOldDirsForJob(jobId);
-    if (metaInfo != null) {
-      return metaInfo;
-    }
-    return null;
-  }
-  
-  private Job findJob(JobId jobId) throws IOException {
-    //Job already loaded.
-    if (loadedJobCache.containsKey(jobId)) {
-      return loadedJobCache.get(jobId);        
-    }
-    
-    //MetaInfo available in cache.
-    MetaInfo metaInfo = null;
-    if (jobListCache.containsKey(jobId)) {
-      metaInfo = jobListCache.get(jobId);
-    }
-
-    if (metaInfo != null) {
-      return loadJob(metaInfo);
-    }
-    
-    //MetaInfo not available. Check intermediate directory for meta info.
-    metaInfo = scanIntermediateForJob(jobId);
-    if (metaInfo != null) {
-      return loadJob(metaInfo);
-    }
-    
-    //Intermediate directory does not contain job. Search through older ones.
-    metaInfo = scanOldDirsForJob(jobId);
-    if (metaInfo != null) {
-      return loadJob(metaInfo);
-    }
-    return null;
-  }
-  
-  private void moveToDone(MetaInfo metaInfo) throws IOException {
-    long completeTime = metaInfo.getJobIndexInfo().getFinishTime();
-    if (completeTime == 0) completeTime = System.currentTimeMillis();
-    JobId jobId = metaInfo.getJobIndexInfo().getJobId();
-    
-    List<Path> paths = new ArrayList<Path>();
-    Path historyFile = metaInfo.getHistoryFile();
-    if (historyFile == null) {
-      LOG.info("No file for job-history with " + jobId + " found in cache!");
-    } else {
-      paths.add(historyFile);
-    }
-    
-    Path confFile = metaInfo.getConfFile();
-    if (confFile == null) {
-      LOG.info("No file for jobConf with " + jobId + " found in cache!");
-    } else {
-      paths.add(confFile);
-    }
-    
-    //TODO Check all mi getters and setters for the conf path
-    Path summaryFile = metaInfo.getSummaryFile();
-    if (summaryFile == null) {
-      LOG.info("No summary file for job: " + jobId);
-    } else {
-      try {
-        String jobSummaryString = getJobSummary(intermediateDoneDirFc, summaryFile);
-        SUMMARY_LOG.info(jobSummaryString);
-        LOG.info("Deleting JobSummary file: [" + summaryFile + "]");
-        intermediateDoneDirFc.delete(summaryFile, false);
-        metaInfo.setSummaryFile(null);
-      } catch (IOException e) {
-        LOG.warn("Failed to process summary file: [" + summaryFile + "]");
-        throw e;
-      }
-    }
-
-    Path targetDir = canonicalHistoryLogPath(jobId, completeTime);
-    addDirectoryToSerialNumberIndex(targetDir);
-    try {
-      maybeMakeSubdirectory(targetDir);
-    } catch (IOException e) {
-      LOG.warn("Failed creating subdirectory: " + targetDir + 
-          " while attempting to move files for jobId: " + jobId);
-      throw e;
-    }
-    synchronized (metaInfo) {
-      if (historyFile != null) {
-        Path toPath = doneDirFc.makeQualified(new Path(targetDir, 
-            historyFile.getName()));
-        try {
-          moveToDoneNow(historyFile, toPath);
-        } catch (IOException e) {
-          LOG.warn("Failed to move file: " + historyFile + " for jobId: "
-              + jobId);
-          throw e;
-        }
-        metaInfo.setHistoryFile(toPath);
-      }
-      if (confFile != null) {
-        Path toPath = doneDirFc.makeQualified(new Path(targetDir, 
-            confFile.getName()));
-        try {
-          moveToDoneNow(confFile, toPath);
-        } catch (IOException e) {
-          LOG.warn("Failed to move file: " + historyFile + " for jobId: " 
-              + jobId);
-          throw e;
-        }
-        metaInfo.setConfFile(toPath);
-      }
-    }
-    addToJobListCache(jobId, metaInfo);
-    intermediateListCache.remove(jobId);
-  }
-  
-  private void moveToDoneNow(final Path src, final Path target)
-      throws IOException {
-    LOG.info("Moving " + src.toString() + " to " + target.toString());
-    intermediateDoneDirFc.rename(src, target, Options.Rename.NONE);
-    // fc.util().copy(src, target);
-    //fc.delete(src, false);
-    //intermediateDoneDirFc.setPermission(target, new FsPermission(
-    //JobHistoryUtils.HISTORY_DONE_FILE_PERMISSION));
-  }
-  
-  String getJobSummary(FileContext fc, Path path) throws IOException {
-    Path qPath = fc.makeQualified(path);
-    FSDataInputStream in = fc.open(qPath);
-    String jobSummaryString = in.readUTF();
-    in.close();
-    return jobSummaryString;
+    return hsManager.getMetaInfo(jobId);
   }
-  
-  private void maybeMakeSubdirectory(Path path) throws IOException {
-    boolean existsInExistingCache = false;
-    synchronized(existingDoneSubdirs) {
-      if (existingDoneSubdirs.contains(path)) existsInExistingCache = true;
-    }
-    try {
-      doneDirFc.getFileStatus(path);
-      if (!existsInExistingCache) {
-        existingDoneSubdirs.add(path);
-        if (debugMode) {
-          LOG.info("JobHistory.maybeMakeSubdirectory -- We believed "
-                             + path + " already existed, but it didn't.");
-        }
-      }
-    } catch (FileNotFoundException fnfE) {
-      try {
-        FsPermission fsp = 
-          new FsPermission(JobHistoryUtils.HISTORY_DONE_DIR_PERMISSION);
-        doneDirFc.mkdir(path, fsp, true);
-        FileStatus fsStatus = doneDirFc.getFileStatus(path);
-        LOG.info("Perms after creating " + fsStatus.getPermission().toShort()
-            + ", Expected: " + fsp.toShort());
-        if (fsStatus.getPermission().toShort() != fsp.toShort()) {
-          LOG.info("Explicitly setting permissions to : " + fsp.toShort()
-              + ", " + fsp);
-          doneDirFc.setPermission(path, fsp);
-        }
-        synchronized(existingDoneSubdirs) {
-          existingDoneSubdirs.add(path);
-        }
-      } catch (FileAlreadyExistsException faeE) { //Nothing to do.
-      }
-    }
-  }
-  
-  private Path canonicalHistoryLogPath(JobId id, String timestampComponent) {
-    return new Path(doneDirPrefixPath, 
-        JobHistoryUtils.historyLogSubdirectory(id, timestampComponent, serialNumberFormat));
-  }
-  
-  private Path canonicalHistoryLogPath(JobId id, long millisecondTime) {
-    String timestampComponent = 
-      JobHistoryUtils.timestampDirectoryComponent(millisecondTime, debugMode);
-    return new Path(doneDirPrefixPath, 
-        JobHistoryUtils.historyLogSubdirectory(id, timestampComponent, serialNumberFormat));
-  }  
-  
 
   @Override
-  public synchronized Job getJob(JobId jobId) {
-    if(LOG.isDebugEnabled()) {
-      LOG.debug("Looking for Job "+jobId);
-    }
-    Job job = null;
-    try {
-      job = findJob(jobId);
-      //This could return a null job.
-    } catch (IOException e) {
-      throw new YarnException(e);
-    }
-    return job;
+  public Job getJob(JobId jobId) {
+    return storage.getFullJob(jobId);
   }
 
   @Override
   public Map<JobId, Job> getAllJobs(ApplicationId appID) {
-    if(LOG.isDebugEnabled()) {
+    if (LOG.isDebugEnabled()) {
       LOG.debug("Called getAllJobs(AppId): " + appID);
     }
-//    currently there is 1 to 1 mapping between app and job id
+    // currently there is 1 to 1 mapping between app and job id
     org.apache.hadoop.mapreduce.JobID oldJobID = TypeConverter.fromYarn(appID);
     Map<JobId, Job> jobs = new HashMap<JobId, Job>();
     JobId jobID = TypeConverter.toYarn(oldJobID);
     jobs.put(jobID, getJob(jobID));
     return jobs;
-//    return getAllJobs();
   }
-  
-  /* (non-Javadoc)
-   * @see org.apache.hadoop.mapreduce.v2.hs.HistoryContext#getAllJobs()
-   * 
-   * Returns a recent list of jobs. This may not be the complete set.
-   * If a previous jobId is known - it can be queries via the getJob(JobId)
-   * method.
-   * Size of this list is determined by the size of the job list cache.
-   * This can be fixed when pagination is implemented - return the first set of
-   * jobs via the cache, go to DFS only when an attempt is made to navigate
-   * past the cached list.
-   * This does involve a DFS oepration of scanning the intermediate directory.
-   */
+
+  @Override
   public Map<JobId, Job> getAllJobs() {
-    LOG.debug("Called getAllJobs()");
-    return getAllJobsInternal();
+    return storage.getAllPartialJobs();
   }
 
-  static class MetaInfo {
-    private Path historyFile;
-    private Path confFile; 
-    private Path summaryFile;
-    JobIndexInfo jobIndexInfo;
-
-    MetaInfo(Path historyFile, Path confFile, Path summaryFile, 
-        JobIndexInfo jobIndexInfo) {
-      this.historyFile = historyFile;
-      this.confFile = confFile;
-      this.summaryFile = summaryFile;
-      this.jobIndexInfo = jobIndexInfo;
-    }
-
-    Path getHistoryFile() { return historyFile; }
-    Path getConfFile() { return confFile; }
-    Path getSummaryFile() { return summaryFile; }
-    JobIndexInfo getJobIndexInfo() { return jobIndexInfo; }
-    
-    void setHistoryFile(Path historyFile) { this.historyFile = historyFile; }
-    void setConfFile(Path confFile) {this.confFile = confFile; }
-    void setSummaryFile(Path summaryFile) { this.summaryFile = summaryFile; }
+  /**
+   * Look for a set of partial jobs.
+   * 
+   * @param offset
+   *          the offset into the list of jobs.
+   * @param count
+   *          the maximum number of jobs to return.
+   * @param user
+   *          only return jobs for the given user.
+   * @param queue
+   *          only return jobs for in the given queue.
+   * @param sBegin
+   *          only return Jobs that started on or after the given time.
+   * @param sEnd
+   *          only return Jobs that started on or before the given time.
+   * @param fBegin
+   *          only return Jobs that ended on or after the given time.
+   * @param fEnd
+   *          only return Jobs that ended on or before the given time.
+   * @param jobState
+   *          only return jobs that are in the give job state.
+   * @return The list of filtered jobs.
+   */
+  @Override
+  public JobsInfo getPartialJobs(Long offset, Long count, String user,
+      String queue, Long sBegin, Long sEnd, Long fBegin, Long fEnd,
+      JobState jobState) {
+    return storage.getPartialJobs(offset, count, user, queue, sBegin, sEnd,
+        fBegin, fEnd, jobState);
   }
-  
 
   public class HistoryCleaner implements Runnable {
-    private long currentTime;
-    
     long maxAgeMillis;
-    long filesDeleted = 0;
-    long dirsDeleted = 0;
-    
+
     public HistoryCleaner(long maxAge) {
       this.maxAgeMillis = maxAge;
     }
-    
-    @SuppressWarnings("unchecked")
+
     public void run() {
       LOG.info("History Cleaner started");
-      currentTime = System.currentTimeMillis();
-      boolean halted = false;
-      //TODO Delete YYYY/MM/DD directories.
+      long cutoff = System.currentTimeMillis() - maxAgeMillis;
       try {
-        List<FileStatus> serialDirList = findTimestampedDirectories();
-        //Sort in ascending order. Relies on YYYY/MM/DD/Serial
-        Collections.sort(serialDirList);
-        for (FileStatus serialDir : serialDirList) {
-          List<FileStatus> historyFileList = 
-            scanDirectoryForHistoryFiles(serialDir.getPath(), doneDirFc);
-          for (FileStatus historyFile : historyFileList) {
-            JobIndexInfo jobIndexInfo = 
-              FileNameIndexUtils.getIndexInfo(historyFile.getPath().getName());
-            long effectiveTimestamp = 
-              getEffectiveTimestamp(jobIndexInfo.getFinishTime(), historyFile);
-            if (shouldDelete(effectiveTimestamp)) {
-              String confFileName = 
-                JobHistoryUtils.getIntermediateConfFileName(jobIndexInfo.getJobId());
-              MetaInfo metaInfo = new MetaInfo(historyFile.getPath(),
-                  new Path(historyFile.getPath().getParent(), confFileName), 
-                  null, jobIndexInfo);
-              delete(metaInfo);
-            } else {
-              halted = true;
-              break;
-            }
-          }
-          if (!halted) {
-            deleteDir(serialDir.getPath());
-            removeDirectoryFromSerialNumberIndex(serialDir.getPath());
-            synchronized (existingDoneSubdirs) {
-              existingDoneSubdirs.remove(serialDir.getPath());  
-            }
-            
-          } else {
-            break; //Don't scan any more directories.
-    }
-  }
+        hsManager.clean(cutoff, storage);
       } catch (IOException e) {
-        LOG.warn("Error in History cleaner run", e);
+        LOG.warn("Error trying to clean up ", e);
       }
       LOG.info("History Cleaner complete");
-      LOG.info("FilesDeleted: " + filesDeleted);
-      LOG.info("Directories Deleted: " + dirsDeleted);
-    }
-    
-    private boolean shouldDelete(long ts) {
-      return ((ts + maxAgeMillis) <= currentTime);
-    }
-    
-    private long getEffectiveTimestamp(long finishTime, FileStatus fileStatus) {
-      if (finishTime == 0) {
-        return fileStatus.getModificationTime();
-      }
-      return finishTime;
-    }
-    
-    private void delete(MetaInfo metaInfo) throws IOException {
-      deleteFile(metaInfo.getHistoryFile());
-      deleteFile(metaInfo.getConfFile());
-      jobListCache.remove(metaInfo.getJobIndexInfo().getJobId());
-      loadedJobCache.remove(metaInfo.getJobIndexInfo().getJobId());
-    }
-    
-    private void deleteFile(final Path path) throws IOException {
-      doneDirFc.delete(doneDirFc.makeQualified(path), false);
-      filesDeleted++;
-    }
-    
-    private void deleteDir(Path path) throws IOException {
-      doneDirFc.delete(doneDirFc.makeQualified(path), true);
-      dirsDeleted++;
-    }
     }
-    
-  
-  
-  //TODO AppContext - Not Required
-  private  ApplicationAttemptId appAttemptID;
+  }
+
+  // TODO AppContext - Not Required
+  private ApplicationAttemptId appAttemptID;
+
   @Override
   public ApplicationAttemptId getApplicationAttemptId() {
-  //TODO fixme - bogus appAttemptID for now
+    // TODO fixme - bogus appAttemptID for now
     return appAttemptID;
-  }  
-  
-  //TODO AppContext - Not Required
+  }
+
+  // TODO AppContext - Not Required
   private ApplicationId appID;
+
   @Override
   public ApplicationId getApplicationID() {
-  //TODO fixme - bogus appID for now
+    // TODO fixme - bogus appID for now
     return appID;
   }
-  
-  //TODO AppContext - Not Required
+
+  // TODO AppContext - Not Required
   @Override
   public EventHandler getEventHandler() {
     // TODO Auto-generated method stub
     return null;
   }
-  
-  //TODO AppContext - Not Required
+
+  // TODO AppContext - Not Required
   private String userName;
+
   @Override
   public CharSequence getUser() {
     if (userName != null) {

+ 1 - 0
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/PartialJob.java

@@ -51,6 +51,7 @@ public class PartialJob implements org.apache.hadoop.mapreduce.v2.app.job.Job {
     jobReport = RecordFactoryProvider.getRecordFactory(null).newRecordInstance(JobReport.class);
     jobReport.setStartTime(jobIndexInfo.getSubmitTime());
     jobReport.setFinishTime(jobIndexInfo.getFinishTime());
+    jobReport.setJobState(getState());
   }
   
   @Override

+ 1 - 0
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/webapp/HsWebApp.java

@@ -44,6 +44,7 @@ public class HsWebApp extends WebApp implements AMParams {
     bind(JAXBContextResolver.class);
     bind(GenericExceptionHandler.class);
     bind(AppContext.class).toInstance(history);
+    bind(HistoryContext.class).toInstance(history);
     route("/", HsController.class);
     route("/app", HsController.class);
     route(pajoin("/job", JOB_ID), HsController.class, "job");

+ 33 - 79
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/webapp/HsWebServices.java

@@ -32,10 +32,8 @@ import javax.ws.rs.core.UriInfo;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.mapreduce.v2.api.records.AMInfo;
 import org.apache.hadoop.mapreduce.v2.api.records.JobState;
-import org.apache.hadoop.mapreduce.v2.api.records.JobReport;
 import org.apache.hadoop.mapreduce.v2.api.records.TaskId;
 import org.apache.hadoop.mapreduce.v2.api.records.TaskType;
-import org.apache.hadoop.mapreduce.v2.app.AppContext;
 import org.apache.hadoop.mapreduce.v2.app.job.Job;
 import org.apache.hadoop.mapreduce.v2.app.job.Task;
 import org.apache.hadoop.mapreduce.v2.app.job.TaskAttempt;
@@ -49,6 +47,7 @@ import org.apache.hadoop.mapreduce.v2.app.webapp.dao.TaskAttemptInfo;
 import org.apache.hadoop.mapreduce.v2.app.webapp.dao.TaskAttemptsInfo;
 import org.apache.hadoop.mapreduce.v2.app.webapp.dao.TaskInfo;
 import org.apache.hadoop.mapreduce.v2.app.webapp.dao.TasksInfo;
+import org.apache.hadoop.mapreduce.v2.hs.HistoryContext;
 import org.apache.hadoop.mapreduce.v2.hs.webapp.dao.AMAttemptInfo;
 import org.apache.hadoop.mapreduce.v2.hs.webapp.dao.AMAttemptsInfo;
 import org.apache.hadoop.mapreduce.v2.hs.webapp.dao.HistoryInfo;
@@ -64,7 +63,7 @@ import com.google.inject.Inject;
 
 @Path("/ws/v1/history")
 public class HsWebServices {
-  private final AppContext appCtx;
+  private final HistoryContext ctx;
   private WebApp webapp;
   private final Configuration conf;
 
@@ -72,9 +71,9 @@ public class HsWebServices {
   UriInfo uriInfo;
 
   @Inject
-  public HsWebServices(final AppContext appCtx, final Configuration conf,
+  public HsWebServices(final HistoryContext ctx, final Configuration conf,
       final WebApp webapp) {
-    this.appCtx = appCtx;
+    this.ctx = ctx;
     this.conf = conf;
     this.webapp = webapp;
   }
@@ -103,33 +102,22 @@ public class HsWebServices {
       @QueryParam("startedTimeEnd") String startedEnd,
       @QueryParam("finishedTimeBegin") String finishBegin,
       @QueryParam("finishedTimeEnd") String finishEnd) {
-    JobsInfo allJobs = new JobsInfo();
-    long num = 0;
-    boolean checkCount = false;
-    boolean checkStart = false;
-    boolean checkEnd = false;
-    long countNum = 0;
-
-    // set values suitable in case both of begin/end not specified
-    long sBegin = 0;
-    long sEnd = Long.MAX_VALUE;
-    long fBegin = 0;
-    long fEnd = Long.MAX_VALUE;
 
+    Long countParam = null;
+    
     if (count != null && !count.isEmpty()) {
-      checkCount = true;
       try {
-        countNum = Long.parseLong(count);
+        countParam = Long.parseLong(count);
       } catch (NumberFormatException e) {
         throw new BadRequestException(e.getMessage());
       }
-      if (countNum <= 0) {
+      if (countParam <= 0) {
         throw new BadRequestException("limit value must be greater then 0");
       }
     }
 
+    Long sBegin = null;
     if (startedBegin != null && !startedBegin.isEmpty()) {
-      checkStart = true;
       try {
         sBegin = Long.parseLong(startedBegin);
       } catch (NumberFormatException e) {
@@ -139,8 +127,9 @@ public class HsWebServices {
         throw new BadRequestException("startedTimeBegin must be greater than 0");
       }
     }
+    
+    Long sEnd = null;
     if (startedEnd != null && !startedEnd.isEmpty()) {
-      checkStart = true;
       try {
         sEnd = Long.parseLong(startedEnd);
       } catch (NumberFormatException e) {
@@ -150,13 +139,13 @@ public class HsWebServices {
         throw new BadRequestException("startedTimeEnd must be greater than 0");
       }
     }
-    if (sBegin > sEnd) {
+    if (sBegin != null && sEnd != null && sBegin > sEnd) {
       throw new BadRequestException(
           "startedTimeEnd must be greater than startTimeBegin");
     }
 
+    Long fBegin = null;
     if (finishBegin != null && !finishBegin.isEmpty()) {
-      checkEnd = true;
       try {
         fBegin = Long.parseLong(finishBegin);
       } catch (NumberFormatException e) {
@@ -166,8 +155,8 @@ public class HsWebServices {
         throw new BadRequestException("finishedTimeBegin must be greater than 0");
       }
     }
+    Long fEnd = null;
     if (finishEnd != null && !finishEnd.isEmpty()) {
-      checkEnd = true;
       try {
         fEnd = Long.parseLong(finishEnd);
       } catch (NumberFormatException e) {
@@ -177,53 +166,18 @@ public class HsWebServices {
         throw new BadRequestException("finishedTimeEnd must be greater than 0");
       }
     }
-    if (fBegin > fEnd) {
+    if (fBegin != null && fEnd != null && fBegin > fEnd) {
       throw new BadRequestException(
           "finishedTimeEnd must be greater than finishedTimeBegin");
     }
-
-    for (Job job : appCtx.getAllJobs().values()) {
-      if (checkCount && num == countNum) {
-        break;
-      }
-
-      if (stateQuery != null && !stateQuery.isEmpty()) {
-        JobState.valueOf(stateQuery);
-        if (!job.getState().toString().equalsIgnoreCase(stateQuery)) {
-          continue;
-        }
-      }
-
-      // can't really validate queue is a valid one since queues could change
-      if (queueQuery != null && !queueQuery.isEmpty()) {
-        if (!job.getQueueName().equals(queueQuery)) {
-          continue;
-        }
-      }
-
-      if (userQuery != null && !userQuery.isEmpty()) {
-        if (!job.getUserName().equals(userQuery)) {
-          continue;
-        }
-      }
-
-      JobReport report = job.getReport();
-      
-      if (checkStart
-          && (report.getStartTime() < sBegin || report.getStartTime() > sEnd)) {
-        continue;
-      }
-      if (checkEnd
-          && (report.getFinishTime() < fBegin || report.getFinishTime() > fEnd)) {
-        continue;
-      }
-      
-      JobInfo jobInfo = new JobInfo(job);
-      
-      allJobs.add(jobInfo);
-      num++;
+    
+    JobState jobState = null;
+    if (stateQuery != null) {
+      jobState = JobState.valueOf(stateQuery);
     }
-    return allJobs;
+
+    return ctx.getPartialJobs(0l, countParam, userQuery, queueQuery, 
+        sBegin, sEnd, fBegin, fEnd, jobState);
   }
 
   @GET
@@ -231,7 +185,7 @@ public class HsWebServices {
   @Produces({ MediaType.APPLICATION_JSON, MediaType.APPLICATION_XML })
   public JobInfo getJob(@PathParam("jobid") String jid) {
 
-    Job job = AMWebServices.getJobFromJobIdString(jid, appCtx);
+    Job job = AMWebServices.getJobFromJobIdString(jid, ctx);
     return new JobInfo(job);
   }
 
@@ -240,7 +194,7 @@ public class HsWebServices {
   @Produces({ MediaType.APPLICATION_JSON, MediaType.APPLICATION_XML })
   public AMAttemptsInfo getJobAttempts(@PathParam("jobid") String jid) {
 
-    Job job = AMWebServices.getJobFromJobIdString(jid, appCtx);
+    Job job = AMWebServices.getJobFromJobIdString(jid, ctx);
     AMAttemptsInfo amAttempts = new AMAttemptsInfo();
     for (AMInfo amInfo : job.getAMInfos()) {
       AMAttemptInfo attempt = new AMAttemptInfo(amInfo, MRApps.toString(job
@@ -256,8 +210,8 @@ public class HsWebServices {
   @Produces({ MediaType.APPLICATION_JSON, MediaType.APPLICATION_XML })
   public JobCounterInfo getJobCounters(@PathParam("jobid") String jid) {
 
-    Job job = AMWebServices.getJobFromJobIdString(jid, appCtx);
-    return new JobCounterInfo(this.appCtx, job);
+    Job job = AMWebServices.getJobFromJobIdString(jid, ctx);
+    return new JobCounterInfo(this.ctx, job);
   }
 
   @GET
@@ -265,7 +219,7 @@ public class HsWebServices {
   @Produces({ MediaType.APPLICATION_JSON, MediaType.APPLICATION_XML })
   public ConfInfo getJobConf(@PathParam("jobid") String jid) {
 
-    Job job = AMWebServices.getJobFromJobIdString(jid, appCtx);
+    Job job = AMWebServices.getJobFromJobIdString(jid, ctx);
     ConfInfo info;
     try {
       info = new ConfInfo(job, this.conf);
@@ -282,7 +236,7 @@ public class HsWebServices {
   public TasksInfo getJobTasks(@PathParam("jobid") String jid,
       @QueryParam("type") String type) {
 
-    Job job = AMWebServices.getJobFromJobIdString(jid, appCtx);
+    Job job = AMWebServices.getJobFromJobIdString(jid, ctx);
     TasksInfo allTasks = new TasksInfo();
     for (Task task : job.getTasks().values()) {
       TaskType ttype = null;
@@ -307,7 +261,7 @@ public class HsWebServices {
   public TaskInfo getJobTask(@PathParam("jobid") String jid,
       @PathParam("taskid") String tid) {
 
-    Job job = AMWebServices.getJobFromJobIdString(jid, appCtx);
+    Job job = AMWebServices.getJobFromJobIdString(jid, ctx);
     Task task = AMWebServices.getTaskFromTaskIdString(tid, job);
     return new TaskInfo(task);
 
@@ -319,7 +273,7 @@ public class HsWebServices {
   public JobTaskCounterInfo getSingleTaskCounters(
       @PathParam("jobid") String jid, @PathParam("taskid") String tid) {
 
-    Job job = AMWebServices.getJobFromJobIdString(jid, appCtx);
+    Job job = AMWebServices.getJobFromJobIdString(jid, ctx);
     TaskId taskID = MRApps.toTaskID(tid);
     if (taskID == null) {
       throw new NotFoundException("taskid " + tid + " not found or invalid");
@@ -338,7 +292,7 @@ public class HsWebServices {
       @PathParam("taskid") String tid) {
 
     TaskAttemptsInfo attempts = new TaskAttemptsInfo();
-    Job job = AMWebServices.getJobFromJobIdString(jid, appCtx);
+    Job job = AMWebServices.getJobFromJobIdString(jid, ctx);
     Task task = AMWebServices.getTaskFromTaskIdString(tid, job);
     for (TaskAttempt ta : task.getAttempts().values()) {
       if (ta != null) {
@@ -358,7 +312,7 @@ public class HsWebServices {
   public TaskAttemptInfo getJobTaskAttemptId(@PathParam("jobid") String jid,
       @PathParam("taskid") String tid, @PathParam("attemptid") String attId) {
 
-    Job job = AMWebServices.getJobFromJobIdString(jid, appCtx);
+    Job job = AMWebServices.getJobFromJobIdString(jid, ctx);
     Task task = AMWebServices.getTaskFromTaskIdString(tid, job);
     TaskAttempt ta = AMWebServices.getTaskAttemptFromTaskAttemptString(attId,
         task);
@@ -376,7 +330,7 @@ public class HsWebServices {
       @PathParam("jobid") String jid, @PathParam("taskid") String tid,
       @PathParam("attemptid") String attId) {
 
-    Job job = AMWebServices.getJobFromJobIdString(jid, appCtx);
+    Job job = AMWebServices.getJobFromJobIdString(jid, ctx);
     Task task = AMWebServices.getTaskFromTaskIdString(tid, job);
     TaskAttempt ta = AMWebServices.getTaskAttemptFromTaskAttemptString(attId,
         task);

+ 9 - 1
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/test/java/org/apache/hadoop/mapreduce/v2/hs/TestJobHistoryParsing.java

@@ -92,6 +92,14 @@ public class TestJobHistoryParsing {
     checkHistoryParsing(3, 0, 2);
   }
   
+  private static String getJobSummary(FileContext fc, Path path) throws IOException {
+    Path qPath = fc.makeQualified(path);
+    FSDataInputStream in = fc.open(qPath);
+    String jobSummaryString = in.readUTF();
+    in.close();
+    return jobSummaryString;
+  }
+  
   private void checkHistoryParsing(final int numMaps, final int numReduces,
       final int numSuccessfulMaps) 
   throws Exception {
@@ -244,7 +252,7 @@ public class TestJobHistoryParsing {
       String summaryFileName = JobHistoryUtils
           .getIntermediateSummaryFileName(jobId);
       Path summaryFile = new Path(jobhistoryDir, summaryFileName);
-      String jobSummaryString = jobHistory.getJobSummary(fc, summaryFile);
+      String jobSummaryString = getJobSummary(fc, summaryFile);
       Assert.assertTrue(jobSummaryString.contains("resourcesPerMap=100"));
       Assert.assertTrue(jobSummaryString.contains("resourcesPerReduce=100"));
       Assert.assertNotNull(jobSummaryString);

+ 18 - 1
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/test/java/org/apache/hadoop/mapreduce/v2/hs/webapp/TestHsWebServices.java

@@ -30,11 +30,13 @@ import javax.xml.parsers.DocumentBuilderFactory;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.mapreduce.v2.api.records.JobId;
+import org.apache.hadoop.mapreduce.v2.api.records.JobState;
 import org.apache.hadoop.mapreduce.v2.app.AppContext;
 import org.apache.hadoop.mapreduce.v2.app.MockJobs;
 import org.apache.hadoop.mapreduce.v2.app.job.Job;
 import org.apache.hadoop.mapreduce.v2.hs.HistoryContext;
 import org.apache.hadoop.mapreduce.v2.hs.JobHistory;
+import org.apache.hadoop.mapreduce.v2.hs.webapp.dao.JobsInfo;
 import org.apache.hadoop.util.VersionInfo;
 import org.apache.hadoop.yarn.Clock;
 import org.apache.hadoop.yarn.ClusterInfo;
@@ -77,7 +79,7 @@ public class TestHsWebServices extends JerseyTest {
   private static TestAppContext appContext;
   private static HsWebApp webApp;
 
-  static class TestAppContext implements AppContext {
+  static class TestAppContext implements HistoryContext {
     final ApplicationAttemptId appAttemptID;
     final ApplicationId appID;
     final String user = MockJobs.newUserName();
@@ -144,6 +146,20 @@ public class TestHsWebServices extends JerseyTest {
     public ClusterInfo getClusterInfo() {
       return null;
     }
+
+    @Override
+    public Map<JobId, Job> getAllJobs(ApplicationId appID) {
+      // TODO Auto-generated method stub
+      return null;
+    }
+
+    @Override
+    public JobsInfo getPartialJobs(Long offset, Long count, String user,
+        String queue, Long sBegin, Long sEnd, Long fBegin, Long fEnd,
+        JobState jobState) {
+      // TODO Auto-generated method stub
+      return null;
+    }
   }
 
   private Injector injector = Guice.createInjector(new ServletModule() {
@@ -160,6 +176,7 @@ public class TestHsWebServices extends JerseyTest {
       bind(GenericExceptionHandler.class);
       bind(WebApp.class).toInstance(webApp);
       bind(AppContext.class).toInstance(appContext);
+      bind(HistoryContext.class).toInstance(appContext);
       bind(Configuration.class).toInstance(conf);
 
       serve("/*").with(GuiceContainer.class);

+ 19 - 1
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/test/java/org/apache/hadoop/mapreduce/v2/hs/webapp/TestHsWebServicesAttempts.java

@@ -35,6 +35,7 @@ import javax.xml.parsers.DocumentBuilderFactory;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.mapreduce.v2.api.records.JobId;
+import org.apache.hadoop.mapreduce.v2.api.records.JobState;
 import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId;
 import org.apache.hadoop.mapreduce.v2.api.records.TaskType;
 import org.apache.hadoop.mapreduce.v2.app.AppContext;
@@ -42,6 +43,8 @@ import org.apache.hadoop.mapreduce.v2.app.MockJobs;
 import org.apache.hadoop.mapreduce.v2.app.job.Job;
 import org.apache.hadoop.mapreduce.v2.app.job.Task;
 import org.apache.hadoop.mapreduce.v2.app.job.TaskAttempt;
+import org.apache.hadoop.mapreduce.v2.hs.HistoryContext;
+import org.apache.hadoop.mapreduce.v2.hs.webapp.dao.JobsInfo;
 import org.apache.hadoop.mapreduce.v2.util.MRApps;
 import org.apache.hadoop.yarn.Clock;
 import org.apache.hadoop.yarn.ClusterInfo;
@@ -89,7 +92,7 @@ public class TestHsWebServicesAttempts extends JerseyTest {
   private static TestAppContext appContext;
   private static HsWebApp webApp;
 
-  static class TestAppContext implements AppContext {
+  static class TestAppContext implements HistoryContext {
     final ApplicationAttemptId appAttemptID;
     final ApplicationId appID;
     final String user = MockJobs.newUserName();
@@ -156,6 +159,20 @@ public class TestHsWebServicesAttempts extends JerseyTest {
     public ClusterInfo getClusterInfo() {
       return null;
     }
+
+    @Override
+    public Map<JobId, Job> getAllJobs(ApplicationId appID) {
+      // TODO Auto-generated method stub
+      return null;
+    }
+
+    @Override
+    public JobsInfo getPartialJobs(Long offset, Long count, String user,
+        String queue, Long sBegin, Long sEnd, Long fBegin, Long fEnd,
+        JobState jobState) {
+      // TODO Auto-generated method stub
+      return null;
+    }
   }
 
   private Injector injector = Guice.createInjector(new ServletModule() {
@@ -171,6 +188,7 @@ public class TestHsWebServicesAttempts extends JerseyTest {
       bind(GenericExceptionHandler.class);
       bind(WebApp.class).toInstance(webApp);
       bind(AppContext.class).toInstance(appContext);
+      bind(HistoryContext.class).toInstance(appContext);
       bind(Configuration.class).toInstance(conf);
 
       serve("/*").with(GuiceContainer.class);

+ 19 - 1
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/test/java/org/apache/hadoop/mapreduce/v2/hs/webapp/TestHsWebServicesJobConf.java

@@ -41,9 +41,12 @@ import org.apache.hadoop.fs.FileUtil;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.mapreduce.MRJobConfig;
 import org.apache.hadoop.mapreduce.v2.api.records.JobId;
+import org.apache.hadoop.mapreduce.v2.api.records.JobState;
 import org.apache.hadoop.mapreduce.v2.app.AppContext;
 import org.apache.hadoop.mapreduce.v2.app.MockJobs;
 import org.apache.hadoop.mapreduce.v2.app.job.Job;
+import org.apache.hadoop.mapreduce.v2.hs.HistoryContext;
+import org.apache.hadoop.mapreduce.v2.hs.webapp.dao.JobsInfo;
 import org.apache.hadoop.mapreduce.v2.util.MRApps;
 import org.apache.hadoop.yarn.Clock;
 import org.apache.hadoop.yarn.ClusterInfo;
@@ -90,7 +93,7 @@ public class TestHsWebServicesJobConf extends JerseyTest {
   private static File testConfDir = new File("target",
       TestHsWebServicesJobConf.class.getSimpleName() + "confDir");
 
-  static class TestAppContext implements AppContext {
+  static class TestAppContext implements HistoryContext {
     final ApplicationAttemptId appAttemptID;
     final ApplicationId appID;
     final String user = MockJobs.newUserName();
@@ -156,6 +159,20 @@ public class TestHsWebServicesJobConf extends JerseyTest {
     public ClusterInfo getClusterInfo() {
       return null;
     }
+
+    @Override
+    public Map<JobId, Job> getAllJobs(ApplicationId appID) {
+      // TODO Auto-generated method stub
+      return null;
+    }
+
+    @Override
+    public JobsInfo getPartialJobs(Long offset, Long count, String user,
+        String queue, Long sBegin, Long sEnd, Long fBegin, Long fEnd,
+        JobState jobState) {
+      // TODO Auto-generated method stub
+      return null;
+    }
   }
 
   private Injector injector = Guice.createInjector(new ServletModule() {
@@ -195,6 +212,7 @@ public class TestHsWebServicesJobConf extends JerseyTest {
       bind(GenericExceptionHandler.class);
       bind(WebApp.class).toInstance(webApp);
       bind(AppContext.class).toInstance(appContext);
+      bind(HistoryContext.class).toInstance(appContext);
       bind(Configuration.class).toInstance(conf);
 
       serve("/*").with(GuiceContainer.class);

+ 20 - 1
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/test/java/org/apache/hadoop/mapreduce/v2/hs/webapp/TestHsWebServicesJobs.java

@@ -38,11 +38,15 @@ import javax.xml.parsers.DocumentBuilderFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.mapreduce.v2.api.records.AMInfo;
 import org.apache.hadoop.mapreduce.v2.api.records.JobId;
+import org.apache.hadoop.mapreduce.v2.api.records.JobState;
 import org.apache.hadoop.mapreduce.v2.app.AppContext;
 import org.apache.hadoop.mapreduce.v2.app.MockJobs;
 import org.apache.hadoop.mapreduce.v2.app.job.Job;
+import org.apache.hadoop.mapreduce.v2.hs.CachedHistoryStorage;
+import org.apache.hadoop.mapreduce.v2.hs.HistoryContext;
 import org.apache.hadoop.mapreduce.v2.hs.MockHistoryJobs;
 import org.apache.hadoop.mapreduce.v2.hs.MockHistoryJobs.JobsPair;
+import org.apache.hadoop.mapreduce.v2.hs.webapp.dao.JobsInfo;
 import org.apache.hadoop.mapreduce.v2.util.MRApps;
 import org.apache.hadoop.yarn.Clock;
 import org.apache.hadoop.yarn.ClusterInfo;
@@ -90,7 +94,7 @@ public class TestHsWebServicesJobs extends JerseyTest {
   private static TestAppContext appContext;
   private static HsWebApp webApp;
 
-  static class TestAppContext implements AppContext {
+  static class TestAppContext implements HistoryContext {
     final ApplicationAttemptId appAttemptID;
     final ApplicationId appID;
     final String user = MockJobs.newUserName();
@@ -169,6 +173,20 @@ public class TestHsWebServicesJobs extends JerseyTest {
     public ClusterInfo getClusterInfo() {
       return null;
     }
+
+    @Override
+    public Map<JobId, Job> getAllJobs(ApplicationId appID) {
+      // TODO Auto-generated method stub
+      return null;
+    }
+
+    @Override
+    public JobsInfo getPartialJobs(Long offset, Long count, String user,
+        String queue, Long sBegin, Long sEnd, Long fBegin, Long fEnd,
+        JobState jobState) {
+      return CachedHistoryStorage.getPartialJobs(this.partialJobs.values(), 
+          offset, count, user, queue, sBegin, sEnd, fBegin, fEnd, jobState);
+    }
   }
 
   private Injector injector = Guice.createInjector(new ServletModule() {
@@ -184,6 +202,7 @@ public class TestHsWebServicesJobs extends JerseyTest {
       bind(GenericExceptionHandler.class);
       bind(WebApp.class).toInstance(webApp);
       bind(AppContext.class).toInstance(appContext);
+      bind(HistoryContext.class).toInstance(appContext);
       bind(Configuration.class).toInstance(conf);
 
       serve("/*").with(GuiceContainer.class);

+ 19 - 1
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/test/java/org/apache/hadoop/mapreduce/v2/hs/webapp/TestHsWebServicesJobsQuery.java

@@ -36,8 +36,11 @@ import org.apache.hadoop.mapreduce.v2.api.records.JobState;
 import org.apache.hadoop.mapreduce.v2.app.AppContext;
 import org.apache.hadoop.mapreduce.v2.app.MockJobs;
 import org.apache.hadoop.mapreduce.v2.app.job.Job;
+import org.apache.hadoop.mapreduce.v2.hs.CachedHistoryStorage;
+import org.apache.hadoop.mapreduce.v2.hs.HistoryContext;
 import org.apache.hadoop.mapreduce.v2.hs.MockHistoryJobs;
 import org.apache.hadoop.mapreduce.v2.hs.MockHistoryJobs.JobsPair;
+import org.apache.hadoop.mapreduce.v2.hs.webapp.dao.JobsInfo;
 import org.apache.hadoop.mapreduce.v2.util.MRApps;
 import org.apache.hadoop.yarn.Clock;
 import org.apache.hadoop.yarn.ClusterInfo;
@@ -77,7 +80,7 @@ public class TestHsWebServicesJobsQuery extends JerseyTest {
   private static TestAppContext appContext;
   private static HsWebApp webApp;
 
-  static class TestAppContext implements AppContext {
+  static class TestAppContext implements HistoryContext {
     final String user = MockJobs.newUserName();
     final Map<JobId, Job> fullJobs;
     final Map<JobId, Job> partialJobs;
@@ -152,6 +155,20 @@ public class TestHsWebServicesJobsQuery extends JerseyTest {
     public ClusterInfo getClusterInfo() {
       return null;
     }
+
+    @Override
+    public Map<JobId, Job> getAllJobs(ApplicationId appID) {
+      // TODO Auto-generated method stub
+      return null;
+    }
+
+    @Override
+    public JobsInfo getPartialJobs(Long offset, Long count, String user,
+        String queue, Long sBegin, Long sEnd, Long fBegin, Long fEnd,
+        JobState jobState) {
+      return CachedHistoryStorage.getPartialJobs(this.partialJobs.values(), 
+          offset, count, user, queue, sBegin, sEnd, fBegin, fEnd, jobState);
+    }
   }
 
   private Injector injector = Guice.createInjector(new ServletModule() {
@@ -167,6 +184,7 @@ public class TestHsWebServicesJobsQuery extends JerseyTest {
       bind(GenericExceptionHandler.class);
       bind(WebApp.class).toInstance(webApp);
       bind(AppContext.class).toInstance(appContext);
+      bind(HistoryContext.class).toInstance(appContext);
       bind(Configuration.class).toInstance(conf);
 
       serve("/*").with(GuiceContainer.class);

+ 19 - 1
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/test/java/org/apache/hadoop/mapreduce/v2/hs/webapp/TestHsWebServicesTasks.java

@@ -34,12 +34,15 @@ import javax.xml.parsers.DocumentBuilderFactory;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.mapreduce.v2.api.records.JobId;
+import org.apache.hadoop.mapreduce.v2.api.records.JobState;
 import org.apache.hadoop.mapreduce.v2.api.records.TaskId;
 import org.apache.hadoop.mapreduce.v2.api.records.TaskReport;
 import org.apache.hadoop.mapreduce.v2.app.AppContext;
 import org.apache.hadoop.mapreduce.v2.app.MockJobs;
 import org.apache.hadoop.mapreduce.v2.app.job.Job;
 import org.apache.hadoop.mapreduce.v2.app.job.Task;
+import org.apache.hadoop.mapreduce.v2.hs.HistoryContext;
+import org.apache.hadoop.mapreduce.v2.hs.webapp.dao.JobsInfo;
 import org.apache.hadoop.mapreduce.v2.util.MRApps;
 import org.apache.hadoop.yarn.Clock;
 import org.apache.hadoop.yarn.ClusterInfo;
@@ -85,7 +88,7 @@ public class TestHsWebServicesTasks extends JerseyTest {
   private static TestAppContext appContext;
   private static HsWebApp webApp;
 
-  static class TestAppContext implements AppContext {
+  static class TestAppContext implements HistoryContext {
     final ApplicationAttemptId appAttemptID;
     final ApplicationId appID;
     final String user = MockJobs.newUserName();
@@ -152,6 +155,20 @@ public class TestHsWebServicesTasks extends JerseyTest {
     public ClusterInfo getClusterInfo() {
       return null;
     }
+
+    @Override
+    public Map<JobId, Job> getAllJobs(ApplicationId appID) {
+      // TODO Auto-generated method stub
+      return null;
+    }
+
+    @Override
+    public JobsInfo getPartialJobs(Long offset, Long count, String user,
+        String queue, Long sBegin, Long sEnd, Long fBegin, Long fEnd,
+        JobState jobState) {
+      // TODO Auto-generated method stub
+      return null;
+    }
   }
 
   private Injector injector = Guice.createInjector(new ServletModule() {
@@ -167,6 +184,7 @@ public class TestHsWebServicesTasks extends JerseyTest {
       bind(GenericExceptionHandler.class);
       bind(WebApp.class).toInstance(webApp);
       bind(AppContext.class).toInstance(appContext);
+      bind(HistoryContext.class).toInstance(appContext);
       bind(Configuration.class).toInstance(conf);
 
       serve("/*").with(GuiceContainer.class);