|
@@ -25,12 +25,17 @@ import java.util.Collection;
|
|
|
import java.util.Collections;
|
|
|
import java.util.HashMap;
|
|
|
import java.util.HashSet;
|
|
|
+import java.util.Iterator;
|
|
|
import java.util.List;
|
|
|
import java.util.Map;
|
|
|
import java.util.Set;
|
|
|
import java.util.SortedMap;
|
|
|
import java.util.TreeMap;
|
|
|
import java.util.concurrent.ConcurrentSkipListMap;
|
|
|
+import java.util.concurrent.LinkedBlockingQueue;
|
|
|
+import java.util.concurrent.ThreadFactory;
|
|
|
+import java.util.concurrent.ThreadPoolExecutor;
|
|
|
+import java.util.concurrent.TimeUnit;
|
|
|
|
|
|
import org.apache.commons.logging.Log;
|
|
|
import org.apache.commons.logging.LogFactory;
|
|
@@ -57,6 +62,8 @@ import org.apache.hadoop.mapreduce.v2.jobhistory.JobIndexInfo;
|
|
|
import org.apache.hadoop.yarn.YarnException;
|
|
|
import org.apache.hadoop.yarn.service.AbstractService;
|
|
|
|
|
|
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
|
|
|
+
|
|
|
/**
|
|
|
* This class provides a way to interact with history files in a thread safe
|
|
|
* manor.
|
|
@@ -67,33 +74,251 @@ public class HistoryFileManager extends AbstractService {
|
|
|
private static final Log LOG = LogFactory.getLog(HistoryFileManager.class);
|
|
|
private static final Log SUMMARY_LOG = LogFactory.getLog(JobSummary.class);
|
|
|
|
|
|
+ private static enum HistoryInfoState {
|
|
|
+ IN_INTERMEDIATE, IN_DONE, DELETED, MOVE_FAILED
|
|
|
+ };
|
|
|
+
|
|
|
private static String DONE_BEFORE_SERIAL_TAIL = JobHistoryUtils
|
|
|
.doneSubdirsBeforeSerialTail();
|
|
|
|
|
|
- public static class MetaInfo {
|
|
|
+ /**
|
|
|
+ * Maps between a serial number (generated based on jobId) and the timestamp
|
|
|
+ * component(s) to which it belongs. Facilitates jobId based searches. If a
|
|
|
+ * jobId is not found in this list - it will not be found.
|
|
|
+ */
|
|
|
+ private static class SerialNumberIndex {
|
|
|
+ private SortedMap<String, Set<String>> cache;
|
|
|
+ private int maxSize;
|
|
|
+
|
|
|
+ public SerialNumberIndex(int maxSize) {
|
|
|
+ this.cache = new TreeMap<String, Set<String>>();
|
|
|
+ this.maxSize = maxSize;
|
|
|
+ }
|
|
|
+
|
|
|
+ public synchronized void add(String serialPart, String timestampPart) {
|
|
|
+ if (!cache.containsKey(serialPart)) {
|
|
|
+ cache.put(serialPart, new HashSet<String>());
|
|
|
+ if (cache.size() > maxSize) {
|
|
|
+ String key = cache.firstKey();
|
|
|
+ LOG.error("Dropping " + key
|
|
|
+ + " from the SerialNumberIndex. We will no "
|
|
|
+ + "longer be able to see jobs that are in that serial index for "
|
|
|
+ + cache.get(key));
|
|
|
+ cache.remove(key);
|
|
|
+ }
|
|
|
+ }
|
|
|
+ Set<String> datePartSet = cache.get(serialPart);
|
|
|
+ datePartSet.add(timestampPart);
|
|
|
+ }
|
|
|
+
|
|
|
+ public synchronized void remove(String serialPart, String timeStampPart) {
|
|
|
+ if (cache.containsKey(serialPart)) {
|
|
|
+ Set<String> set = cache.get(serialPart);
|
|
|
+ set.remove(timeStampPart);
|
|
|
+ if (set.isEmpty()) {
|
|
|
+ cache.remove(serialPart);
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ public synchronized Set<String> get(String serialPart) {
|
|
|
+ Set<String> found = cache.get(serialPart);
|
|
|
+ if (found != null) {
|
|
|
+ return new HashSet<String>(found);
|
|
|
+ }
|
|
|
+ return null;
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ private static class JobListCache {
|
|
|
+ private ConcurrentSkipListMap<JobId, HistoryFileInfo> cache;
|
|
|
+ private int maxSize;
|
|
|
+ private long maxAge;
|
|
|
+
|
|
|
+ public JobListCache(int maxSize, long maxAge) {
|
|
|
+ this.maxSize = maxSize;
|
|
|
+ this.maxAge = maxAge;
|
|
|
+ this.cache = new ConcurrentSkipListMap<JobId, HistoryFileInfo>();
|
|
|
+ }
|
|
|
+
|
|
|
+ public HistoryFileInfo addIfAbsent(HistoryFileInfo fileInfo) {
|
|
|
+ JobId jobId = fileInfo.getJobIndexInfo().getJobId();
|
|
|
+ if (LOG.isDebugEnabled()) {
|
|
|
+ LOG.debug("Adding " + jobId + " to job list cache with "
|
|
|
+ + fileInfo.getJobIndexInfo());
|
|
|
+ }
|
|
|
+ HistoryFileInfo old = cache.putIfAbsent(jobId, fileInfo);
|
|
|
+ if (cache.size() > maxSize) {
|
|
|
+ //There is a race here, where more then one thread could be trying to
|
|
|
+ // remove entries. This could result in too many entries being removed
|
|
|
+ // from the cache. This is considered OK as the size of the cache
|
|
|
+ // should be rather large, and we would rather have performance over
|
|
|
+ // keeping the cache size exactly at the maximum.
|
|
|
+ Iterator<JobId> keys = cache.navigableKeySet().iterator();
|
|
|
+ long cutoff = System.currentTimeMillis() - maxAge;
|
|
|
+ while(cache.size() > maxSize && keys.hasNext()) {
|
|
|
+ JobId key = keys.next();
|
|
|
+ HistoryFileInfo firstValue = cache.get(key);
|
|
|
+ if(firstValue != null) {
|
|
|
+ synchronized(firstValue) {
|
|
|
+ if (firstValue.isMovePending()) {
|
|
|
+ if(firstValue.didMoveFail() &&
|
|
|
+ firstValue.jobIndexInfo.getFinishTime() <= cutoff) {
|
|
|
+ cache.remove(key);
|
|
|
+ //Now lets try to delete it
|
|
|
+ try {
|
|
|
+ firstValue.delete();
|
|
|
+ } catch (IOException e) {
|
|
|
+ LOG.error("Error while trying to delete history files" +
|
|
|
+ " that could not be moved to done.", e);
|
|
|
+ }
|
|
|
+ } else {
|
|
|
+ LOG.warn("Waiting to remove " + key
|
|
|
+ + " from JobListCache because it is not in done yet.");
|
|
|
+ }
|
|
|
+ } else {
|
|
|
+ cache.remove(key);
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+ return old;
|
|
|
+ }
|
|
|
+
|
|
|
+ public void delete(HistoryFileInfo fileInfo) {
|
|
|
+ cache.remove(fileInfo.getJobId());
|
|
|
+ }
|
|
|
+
|
|
|
+ public Collection<HistoryFileInfo> values() {
|
|
|
+ return new ArrayList<HistoryFileInfo>(cache.values());
|
|
|
+ }
|
|
|
+
|
|
|
+ public HistoryFileInfo get(JobId jobId) {
|
|
|
+ return cache.get(jobId);
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ public class HistoryFileInfo {
|
|
|
private Path historyFile;
|
|
|
private Path confFile;
|
|
|
private Path summaryFile;
|
|
|
private JobIndexInfo jobIndexInfo;
|
|
|
+ private HistoryInfoState state;
|
|
|
|
|
|
- public MetaInfo(Path historyFile, Path confFile, Path summaryFile,
|
|
|
- JobIndexInfo jobIndexInfo) {
|
|
|
+ private HistoryFileInfo(Path historyFile, Path confFile, Path summaryFile,
|
|
|
+ JobIndexInfo jobIndexInfo, boolean isInDone) {
|
|
|
this.historyFile = historyFile;
|
|
|
this.confFile = confFile;
|
|
|
this.summaryFile = summaryFile;
|
|
|
this.jobIndexInfo = jobIndexInfo;
|
|
|
+ state = isInDone ? HistoryInfoState.IN_DONE
|
|
|
+ : HistoryInfoState.IN_INTERMEDIATE;
|
|
|
}
|
|
|
|
|
|
- private Path getHistoryFile() {
|
|
|
- return historyFile;
|
|
|
+ private synchronized boolean isMovePending() {
|
|
|
+ return state == HistoryInfoState.IN_INTERMEDIATE
|
|
|
+ || state == HistoryInfoState.MOVE_FAILED;
|
|
|
}
|
|
|
|
|
|
- private Path getConfFile() {
|
|
|
- return confFile;
|
|
|
+ private synchronized boolean didMoveFail() {
|
|
|
+ return state == HistoryInfoState.MOVE_FAILED;
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * @return true if the files backed by this were deleted.
|
|
|
+ */
|
|
|
+ public synchronized boolean isDeleted() {
|
|
|
+ return state == HistoryInfoState.DELETED;
|
|
|
+ }
|
|
|
+
|
|
|
+ private synchronized void moveToDone() throws IOException {
|
|
|
+ if (!isMovePending()) {
|
|
|
+ // It was either deleted or is already in done. Either way do nothing
|
|
|
+ return;
|
|
|
+ }
|
|
|
+ try {
|
|
|
+ long completeTime = jobIndexInfo.getFinishTime();
|
|
|
+ if (completeTime == 0) {
|
|
|
+ completeTime = System.currentTimeMillis();
|
|
|
+ }
|
|
|
+ JobId jobId = jobIndexInfo.getJobId();
|
|
|
+
|
|
|
+ List<Path> paths = new ArrayList<Path>(2);
|
|
|
+ if (historyFile == null) {
|
|
|
+ LOG.info("No file for job-history with " + jobId + " found in cache!");
|
|
|
+ } else {
|
|
|
+ paths.add(historyFile);
|
|
|
+ }
|
|
|
+
|
|
|
+ if (confFile == null) {
|
|
|
+ LOG.info("No file for jobConf with " + jobId + " found in cache!");
|
|
|
+ } else {
|
|
|
+ paths.add(confFile);
|
|
|
+ }
|
|
|
+
|
|
|
+ if (summaryFile == null) {
|
|
|
+ LOG.info("No summary file for job: " + jobId);
|
|
|
+ } else {
|
|
|
+ String jobSummaryString = getJobSummary(intermediateDoneDirFc,
|
|
|
+ summaryFile);
|
|
|
+ SUMMARY_LOG.info(jobSummaryString);
|
|
|
+ LOG.info("Deleting JobSummary file: [" + summaryFile + "]");
|
|
|
+ intermediateDoneDirFc.delete(summaryFile, false);
|
|
|
+ summaryFile = null;
|
|
|
+ }
|
|
|
+
|
|
|
+ Path targetDir = canonicalHistoryLogPath(jobId, completeTime);
|
|
|
+ addDirectoryToSerialNumberIndex(targetDir);
|
|
|
+ makeDoneSubdir(targetDir);
|
|
|
+ if (historyFile != null) {
|
|
|
+ Path toPath = doneDirFc.makeQualified(new Path(targetDir, historyFile
|
|
|
+ .getName()));
|
|
|
+ if (!toPath.equals(historyFile)) {
|
|
|
+ moveToDoneNow(historyFile, toPath);
|
|
|
+ historyFile = toPath;
|
|
|
+ }
|
|
|
+ }
|
|
|
+ if (confFile != null) {
|
|
|
+ Path toPath = doneDirFc.makeQualified(new Path(targetDir, confFile
|
|
|
+ .getName()));
|
|
|
+ if (!toPath.equals(confFile)) {
|
|
|
+ moveToDoneNow(confFile, toPath);
|
|
|
+ confFile = toPath;
|
|
|
+ }
|
|
|
+ }
|
|
|
+ state = HistoryInfoState.IN_DONE;
|
|
|
+ } catch (Throwable t) {
|
|
|
+ LOG.error("Error while trying to move a job to done", t);
|
|
|
+ this.state = HistoryInfoState.MOVE_FAILED;
|
|
|
+ }
|
|
|
}
|
|
|
|
|
|
- private Path getSummaryFile() {
|
|
|
- return summaryFile;
|
|
|
+ /**
|
|
|
+ * Parse a job from the JobHistoryFile, if the underlying file is not going
|
|
|
+ * to be deleted.
|
|
|
+ *
|
|
|
+ * @return the Job or null if the underlying file was deleted.
|
|
|
+ * @throws IOException
|
|
|
+ * if there is an error trying to read the file.
|
|
|
+ */
|
|
|
+ public synchronized Job loadJob() throws IOException {
|
|
|
+ return new CompletedJob(conf, jobIndexInfo.getJobId(), historyFile,
|
|
|
+ false, jobIndexInfo.getUser(), this, aclsMgr);
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * Return the history file. This should only be used for testing.
|
|
|
+ * @return the history file.
|
|
|
+ */
|
|
|
+ synchronized Path getHistoryFile() {
|
|
|
+ return historyFile;
|
|
|
+ }
|
|
|
+
|
|
|
+ private synchronized void delete() throws IOException {
|
|
|
+ state = HistoryInfoState.DELETED;
|
|
|
+ doneDirFc.delete(doneDirFc.makeQualified(historyFile), false);
|
|
|
+ doneDirFc.delete(doneDirFc.makeQualified(confFile), false);
|
|
|
}
|
|
|
|
|
|
public JobIndexInfo getJobIndexInfo() {
|
|
@@ -104,57 +329,35 @@ public class HistoryFileManager extends AbstractService {
|
|
|
return jobIndexInfo.getJobId();
|
|
|
}
|
|
|
|
|
|
- private void setHistoryFile(Path historyFile) {
|
|
|
- this.historyFile = historyFile;
|
|
|
- }
|
|
|
-
|
|
|
- private void setConfFile(Path confFile) {
|
|
|
- this.confFile = confFile;
|
|
|
+ public synchronized Path getConfFile() {
|
|
|
+ return confFile;
|
|
|
}
|
|
|
-
|
|
|
- private void setSummaryFile(Path summaryFile) {
|
|
|
- this.summaryFile = summaryFile;
|
|
|
+
|
|
|
+ public synchronized Configuration loadConfFile() throws IOException {
|
|
|
+ FileContext fc = FileContext.getFileContext(confFile.toUri(), conf);
|
|
|
+ Configuration jobConf = new Configuration(false);
|
|
|
+ jobConf.addResource(fc.open(confFile));
|
|
|
+ return jobConf;
|
|
|
}
|
|
|
}
|
|
|
|
|
|
- /**
|
|
|
- * Maps between a serial number (generated based on jobId) and the timestamp
|
|
|
- * component(s) to which it belongs. Facilitates jobId based searches. If a
|
|
|
- * jobId is not found in this list - it will not be found.
|
|
|
- */
|
|
|
- private final SortedMap<String, Set<String>> idToDateString =
|
|
|
- new TreeMap<String, Set<String>>();
|
|
|
- // The number of entries in idToDateString
|
|
|
- private int dateStringCacheSize;
|
|
|
-
|
|
|
- // Maintains minimal details for recent jobs (parsed from history file name).
|
|
|
- // Sorted on Job Completion Time.
|
|
|
- private final SortedMap<JobId, MetaInfo> jobListCache =
|
|
|
- new ConcurrentSkipListMap<JobId, MetaInfo>();
|
|
|
- // The number of jobs to maintain in the job list cache.
|
|
|
- private int jobListCacheSize;
|
|
|
-
|
|
|
- // Re-use existing MetaInfo objects if they exist for the specific JobId.
|
|
|
- // (synchronization on MetaInfo)
|
|
|
- // Check for existence of the object when using iterators.
|
|
|
- private final SortedMap<JobId, MetaInfo> intermediateListCache =
|
|
|
- new ConcurrentSkipListMap<JobId, MetaInfo>();
|
|
|
+ private SerialNumberIndex serialNumberIndex = null;
|
|
|
+ private JobListCache jobListCache = null;
|
|
|
|
|
|
// Maintains a list of known done subdirectories.
|
|
|
- private final Set<Path> existingDoneSubdirs = new HashSet<Path>();
|
|
|
+ private final Set<Path> existingDoneSubdirs = Collections
|
|
|
+ .synchronizedSet(new HashSet<Path>());
|
|
|
|
|
|
/**
|
|
|
* Maintains a mapping between intermediate user directories and the last
|
|
|
* known modification time.
|
|
|
*/
|
|
|
- private Map<String, Long> userDirModificationTimeMap =
|
|
|
- new HashMap<String, Long>();
|
|
|
+ private Map<String, Long> userDirModificationTimeMap = new HashMap<String, Long>();
|
|
|
|
|
|
private JobACLsManager aclsMgr;
|
|
|
|
|
|
private Configuration conf;
|
|
|
|
|
|
- // TODO Remove me!!!!
|
|
|
private boolean debugMode;
|
|
|
private String serialNumberFormat;
|
|
|
|
|
@@ -165,6 +368,9 @@ public class HistoryFileManager extends AbstractService {
|
|
|
private FileContext intermediateDoneDirFc; // Intermediate Done Dir
|
|
|
// FileContext
|
|
|
|
|
|
+ private ThreadPoolExecutor moveToDoneExecutor = null;
|
|
|
+ private long maxHistoryAge = 0;
|
|
|
+
|
|
|
public HistoryFileManager() {
|
|
|
super(HistoryFileManager.class.getName());
|
|
|
}
|
|
@@ -211,12 +417,25 @@ public class HistoryFileManager extends AbstractService {
|
|
|
|
|
|
this.aclsMgr = new JobACLsManager(conf);
|
|
|
|
|
|
- jobListCacheSize = conf.getInt(JHAdminConfig.MR_HISTORY_JOBLIST_CACHE_SIZE,
|
|
|
- JHAdminConfig.DEFAULT_MR_HISTORY_JOBLIST_CACHE_SIZE);
|
|
|
+ maxHistoryAge = conf.getLong(JHAdminConfig.MR_HISTORY_MAX_AGE_MS,
|
|
|
+ JHAdminConfig.DEFAULT_MR_HISTORY_MAX_AGE);
|
|
|
+
|
|
|
+ jobListCache = new JobListCache(conf.getInt(
|
|
|
+ JHAdminConfig.MR_HISTORY_JOBLIST_CACHE_SIZE,
|
|
|
+ JHAdminConfig.DEFAULT_MR_HISTORY_JOBLIST_CACHE_SIZE),
|
|
|
+ maxHistoryAge);
|
|
|
|
|
|
- dateStringCacheSize = conf.getInt(
|
|
|
+ serialNumberIndex = new SerialNumberIndex(conf.getInt(
|
|
|
JHAdminConfig.MR_HISTORY_DATESTRING_CACHE_SIZE,
|
|
|
- JHAdminConfig.DEFAULT_MR_HISTORY_DATESTRING_CACHE_SIZE);
|
|
|
+ JHAdminConfig.DEFAULT_MR_HISTORY_DATESTRING_CACHE_SIZE));
|
|
|
+
|
|
|
+ int numMoveThreads = conf.getInt(
|
|
|
+ JHAdminConfig.MR_HISTORY_MOVE_THREAD_COUNT,
|
|
|
+ JHAdminConfig.DEFAULT_MR_HISTORY_MOVE_THREAD_COUNT);
|
|
|
+ ThreadFactory tf = new ThreadFactoryBuilder().setNameFormat(
|
|
|
+ "MoveIntermediateToDone Thread #%d").build();
|
|
|
+ moveToDoneExecutor = new ThreadPoolExecutor(numMoveThreads, numMoveThreads,
|
|
|
+ 1, TimeUnit.HOURS, new LinkedBlockingQueue<Runnable>(), tf);
|
|
|
|
|
|
super.init(conf);
|
|
|
}
|
|
@@ -249,6 +468,7 @@ public class HistoryFileManager extends AbstractService {
|
|
|
void initExisting() throws IOException {
|
|
|
LOG.info("Initializing Existing Jobs...");
|
|
|
List<FileStatus> timestampedDirList = findTimestampedDirectories();
|
|
|
+ // Sort first just so insertion is in a consistent order
|
|
|
Collections.sort(timestampedDirList);
|
|
|
for (FileStatus fs : timestampedDirList) {
|
|
|
// TODO Could verify the correct format for these directories.
|
|
@@ -271,16 +491,7 @@ public class HistoryFileManager extends AbstractService {
|
|
|
+ serialDirPath.toString() + ". Continuing with next");
|
|
|
return;
|
|
|
}
|
|
|
- synchronized (idToDateString) {
|
|
|
- // TODO make this thread safe without the synchronize
|
|
|
- if (idToDateString.containsKey(serialPart)) {
|
|
|
- Set<String> set = idToDateString.get(serialPart);
|
|
|
- set.remove(timeStampPart);
|
|
|
- if (set.isEmpty()) {
|
|
|
- idToDateString.remove(serialPart);
|
|
|
- }
|
|
|
- }
|
|
|
- }
|
|
|
+ serialNumberIndex.remove(serialPart, timeStampPart);
|
|
|
}
|
|
|
|
|
|
private void addDirectoryToSerialNumberIndex(Path serialDirPath) {
|
|
@@ -299,21 +510,7 @@ public class HistoryFileManager extends AbstractService {
|
|
|
LOG.warn("Could not find serial portion from path: "
|
|
|
+ serialDirPath.toString() + ". Continuing with next");
|
|
|
}
|
|
|
- addToSerialNumberIndex(serialPart, timestampPart);
|
|
|
- }
|
|
|
-
|
|
|
- private void addToSerialNumberIndex(String serialPart, String timestampPart) {
|
|
|
- synchronized (idToDateString) {
|
|
|
- // TODO make this thread safe without the synchronize
|
|
|
- if (!idToDateString.containsKey(serialPart)) {
|
|
|
- idToDateString.put(serialPart, new HashSet<String>());
|
|
|
- if (idToDateString.size() > dateStringCacheSize) {
|
|
|
- idToDateString.remove(idToDateString.firstKey());
|
|
|
- }
|
|
|
- Set<String> datePartSet = idToDateString.get(serialPart);
|
|
|
- datePartSet.add(timestampPart);
|
|
|
- }
|
|
|
- }
|
|
|
+ serialNumberIndex.add(serialPart, timestampPart);
|
|
|
}
|
|
|
|
|
|
private void addDirectoryToJobListCache(Path path) throws IOException {
|
|
@@ -332,10 +529,10 @@ public class HistoryFileManager extends AbstractService {
|
|
|
.getIntermediateConfFileName(jobIndexInfo.getJobId());
|
|
|
String summaryFileName = JobHistoryUtils
|
|
|
.getIntermediateSummaryFileName(jobIndexInfo.getJobId());
|
|
|
- MetaInfo metaInfo = new MetaInfo(fs.getPath(), new Path(fs.getPath()
|
|
|
- .getParent(), confFileName), new Path(fs.getPath().getParent(),
|
|
|
- summaryFileName), jobIndexInfo);
|
|
|
- addToJobListCache(metaInfo);
|
|
|
+ HistoryFileInfo fileInfo = new HistoryFileInfo(fs.getPath(), new Path(fs
|
|
|
+ .getPath().getParent(), confFileName), new Path(fs.getPath()
|
|
|
+ .getParent(), summaryFileName), jobIndexInfo, true);
|
|
|
+ jobListCache.addIfAbsent(fileInfo);
|
|
|
}
|
|
|
}
|
|
|
|
|
@@ -371,25 +568,18 @@ public class HistoryFileManager extends AbstractService {
|
|
|
return fsList;
|
|
|
}
|
|
|
|
|
|
- private void addToJobListCache(MetaInfo metaInfo) {
|
|
|
- JobId jobId = metaInfo.getJobIndexInfo().getJobId();
|
|
|
- if (LOG.isDebugEnabled()) {
|
|
|
- LOG.debug("Adding " + jobId + " to job list cache with "
|
|
|
- + metaInfo.getJobIndexInfo());
|
|
|
- }
|
|
|
- jobListCache.put(jobId, metaInfo);
|
|
|
- if (jobListCache.size() > jobListCacheSize) {
|
|
|
- jobListCache.remove(jobListCache.firstKey());
|
|
|
- }
|
|
|
- }
|
|
|
-
|
|
|
/**
|
|
|
* Scans the intermediate directory to find user directories. Scans these for
|
|
|
- * history files if the modification time for the directory has changed.
|
|
|
+ * history files if the modification time for the directory has changed. Once
|
|
|
+ * it finds history files it starts the process of moving them to the done
|
|
|
+ * directory.
|
|
|
*
|
|
|
* @throws IOException
|
|
|
+ * if there was a error while scanning
|
|
|
*/
|
|
|
- private void scanIntermediateDirectory() throws IOException {
|
|
|
+ void scanIntermediateDirectory() throws IOException {
|
|
|
+ // TODO it would be great to limit how often this happens, except in the
|
|
|
+ // case where we are looking for a particular job.
|
|
|
List<FileStatus> userDirList = JobHistoryUtils.localGlobber(
|
|
|
intermediateDoneDirFc, intermediateDoneDirPath, "");
|
|
|
|
|
@@ -405,7 +595,12 @@ public class HistoryFileManager extends AbstractService {
|
|
|
}
|
|
|
}
|
|
|
if (shouldScan) {
|
|
|
- scanIntermediateDirectory(userDir.getPath());
|
|
|
+ try {
|
|
|
+ scanIntermediateDirectory(userDir.getPath());
|
|
|
+ } catch (IOException e) {
|
|
|
+ LOG.error("Error while trying to scan the directory "
|
|
|
+ + userDir.getPath(), e);
|
|
|
+ }
|
|
|
}
|
|
|
}
|
|
|
}
|
|
@@ -426,11 +621,33 @@ public class HistoryFileManager extends AbstractService {
|
|
|
.getIntermediateConfFileName(jobIndexInfo.getJobId());
|
|
|
String summaryFileName = JobHistoryUtils
|
|
|
.getIntermediateSummaryFileName(jobIndexInfo.getJobId());
|
|
|
- MetaInfo metaInfo = new MetaInfo(fs.getPath(), new Path(fs.getPath()
|
|
|
- .getParent(), confFileName), new Path(fs.getPath().getParent(),
|
|
|
- summaryFileName), jobIndexInfo);
|
|
|
- if (!intermediateListCache.containsKey(jobIndexInfo.getJobId())) {
|
|
|
- intermediateListCache.put(jobIndexInfo.getJobId(), metaInfo);
|
|
|
+ HistoryFileInfo fileInfo = new HistoryFileInfo(fs.getPath(), new Path(fs
|
|
|
+ .getPath().getParent(), confFileName), new Path(fs.getPath()
|
|
|
+ .getParent(), summaryFileName), jobIndexInfo, false);
|
|
|
+
|
|
|
+ final HistoryFileInfo old = jobListCache.addIfAbsent(fileInfo);
|
|
|
+ if (old == null || old.didMoveFail()) {
|
|
|
+ final HistoryFileInfo found = (old == null) ? fileInfo : old;
|
|
|
+ long cutoff = System.currentTimeMillis() - maxHistoryAge;
|
|
|
+ if(found.getJobIndexInfo().getFinishTime() <= cutoff) {
|
|
|
+ try {
|
|
|
+ found.delete();
|
|
|
+ } catch (IOException e) {
|
|
|
+ LOG.warn("Error cleaning up a HistoryFile that is out of date.", e);
|
|
|
+ }
|
|
|
+ } else {
|
|
|
+ moveToDoneExecutor.execute(new Runnable() {
|
|
|
+ @Override
|
|
|
+ public void run() {
|
|
|
+ try {
|
|
|
+ found.moveToDone();
|
|
|
+ } catch (IOException e) {
|
|
|
+ LOG.info("Failed to process fileInfo for job: " +
|
|
|
+ found.getJobId(), e);
|
|
|
+ }
|
|
|
+ }
|
|
|
+ });
|
|
|
+ }
|
|
|
}
|
|
|
}
|
|
|
}
|
|
@@ -442,11 +659,11 @@ public class HistoryFileManager extends AbstractService {
|
|
|
* fileStatus list of Job History Files.
|
|
|
* @param jobId
|
|
|
* The JobId to find.
|
|
|
- * @return A MetaInfo object for the jobId, null if not found.
|
|
|
+ * @return A FileInfo object for the jobId, null if not found.
|
|
|
* @throws IOException
|
|
|
*/
|
|
|
- private MetaInfo getJobMetaInfo(List<FileStatus> fileStatusList, JobId jobId)
|
|
|
- throws IOException {
|
|
|
+ private HistoryFileInfo getJobFileInfo(List<FileStatus> fileStatusList,
|
|
|
+ JobId jobId) throws IOException {
|
|
|
for (FileStatus fs : fileStatusList) {
|
|
|
JobIndexInfo jobIndexInfo = FileNameIndexUtils.getIndexInfo(fs.getPath()
|
|
|
.getName());
|
|
@@ -455,10 +672,10 @@ public class HistoryFileManager extends AbstractService {
|
|
|
.getIntermediateConfFileName(jobIndexInfo.getJobId());
|
|
|
String summaryFileName = JobHistoryUtils
|
|
|
.getIntermediateSummaryFileName(jobIndexInfo.getJobId());
|
|
|
- MetaInfo metaInfo = new MetaInfo(fs.getPath(), new Path(fs.getPath()
|
|
|
- .getParent(), confFileName), new Path(fs.getPath().getParent(),
|
|
|
- summaryFileName), jobIndexInfo);
|
|
|
- return metaInfo;
|
|
|
+ HistoryFileInfo fileInfo = new HistoryFileInfo(fs.getPath(), new Path(
|
|
|
+ fs.getPath().getParent(), confFileName), new Path(fs.getPath()
|
|
|
+ .getParent(), summaryFileName), jobIndexInfo, true);
|
|
|
+ return fileInfo;
|
|
|
}
|
|
|
}
|
|
|
return null;
|
|
@@ -474,175 +691,51 @@ public class HistoryFileManager extends AbstractService {
|
|
|
* @return
|
|
|
* @throws IOException
|
|
|
*/
|
|
|
- private MetaInfo scanOldDirsForJob(JobId jobId) throws IOException {
|
|
|
+ private HistoryFileInfo scanOldDirsForJob(JobId jobId) throws IOException {
|
|
|
int jobSerialNumber = JobHistoryUtils.jobSerialNumber(jobId);
|
|
|
String boxedSerialNumber = String.valueOf(jobSerialNumber);
|
|
|
- Set<String> dateStringSet;
|
|
|
- synchronized (idToDateString) {
|
|
|
- Set<String> found = idToDateString.get(boxedSerialNumber);
|
|
|
- if (found == null) {
|
|
|
- return null;
|
|
|
- } else {
|
|
|
- dateStringSet = new HashSet<String>(found);
|
|
|
- }
|
|
|
+ Set<String> dateStringSet = serialNumberIndex.get(boxedSerialNumber);
|
|
|
+ if (dateStringSet == null) {
|
|
|
+ return null;
|
|
|
}
|
|
|
for (String timestampPart : dateStringSet) {
|
|
|
Path logDir = canonicalHistoryLogPath(jobId, timestampPart);
|
|
|
List<FileStatus> fileStatusList = scanDirectoryForHistoryFiles(logDir,
|
|
|
doneDirFc);
|
|
|
- MetaInfo metaInfo = getJobMetaInfo(fileStatusList, jobId);
|
|
|
- if (metaInfo != null) {
|
|
|
- return metaInfo;
|
|
|
+ HistoryFileInfo fileInfo = getJobFileInfo(fileStatusList, jobId);
|
|
|
+ if (fileInfo != null) {
|
|
|
+ return fileInfo;
|
|
|
}
|
|
|
}
|
|
|
return null;
|
|
|
}
|
|
|
|
|
|
- /**
|
|
|
- * Checks for the existence of the job history file in the intermediate
|
|
|
- * directory.
|
|
|
- *
|
|
|
- * @param jobId
|
|
|
- * @return
|
|
|
- * @throws IOException
|
|
|
- */
|
|
|
- private MetaInfo scanIntermediateForJob(JobId jobId) throws IOException {
|
|
|
- scanIntermediateDirectory();
|
|
|
- return intermediateListCache.get(jobId);
|
|
|
- }
|
|
|
-
|
|
|
- /**
|
|
|
- * Parse a job from the JobHistoryFile, if the underlying file is not going to
|
|
|
- * be deleted.
|
|
|
- *
|
|
|
- * @param metaInfo
|
|
|
- * the where the JobHistory is stored.
|
|
|
- * @return the Job or null if the underlying file was deleted.
|
|
|
- * @throws IOException
|
|
|
- * if there is an error trying to read the file.
|
|
|
- */
|
|
|
- public Job loadJob(MetaInfo metaInfo) throws IOException {
|
|
|
- return new CompletedJob(conf, metaInfo.getJobIndexInfo().getJobId(),
|
|
|
- metaInfo.getHistoryFile(), false, metaInfo.getJobIndexInfo().getUser(),
|
|
|
- metaInfo.getConfFile(), aclsMgr);
|
|
|
- }
|
|
|
-
|
|
|
- public Collection<MetaInfo> getAllMetaInfo() throws IOException {
|
|
|
- scanIntermediateDirectory();
|
|
|
- ArrayList<MetaInfo> result = new ArrayList<MetaInfo>();
|
|
|
- result.addAll(intermediateListCache.values());
|
|
|
- result.addAll(jobListCache.values());
|
|
|
- return result;
|
|
|
- }
|
|
|
-
|
|
|
- Collection<MetaInfo> getIntermediateMetaInfos() throws IOException {
|
|
|
+ public Collection<HistoryFileInfo> getAllFileInfo() throws IOException {
|
|
|
scanIntermediateDirectory();
|
|
|
- return intermediateListCache.values();
|
|
|
+ return jobListCache.values();
|
|
|
}
|
|
|
|
|
|
- public MetaInfo getMetaInfo(JobId jobId) throws IOException {
|
|
|
- // MetaInfo available in cache.
|
|
|
- MetaInfo metaInfo = null;
|
|
|
- if (jobListCache.containsKey(jobId)) {
|
|
|
- metaInfo = jobListCache.get(jobId);
|
|
|
- }
|
|
|
-
|
|
|
- if (metaInfo != null) {
|
|
|
- return metaInfo;
|
|
|
+ public HistoryFileInfo getFileInfo(JobId jobId) throws IOException {
|
|
|
+ // FileInfo available in cache.
|
|
|
+ HistoryFileInfo fileInfo = jobListCache.get(jobId);
|
|
|
+ if (fileInfo != null) {
|
|
|
+ return fileInfo;
|
|
|
}
|
|
|
-
|
|
|
- // MetaInfo not available. Check intermediate directory for meta info.
|
|
|
- metaInfo = scanIntermediateForJob(jobId);
|
|
|
- if (metaInfo != null) {
|
|
|
- return metaInfo;
|
|
|
+ // OK so scan the intermediate to be sure we did not lose it that way
|
|
|
+ scanIntermediateDirectory();
|
|
|
+ fileInfo = jobListCache.get(jobId);
|
|
|
+ if (fileInfo != null) {
|
|
|
+ return fileInfo;
|
|
|
}
|
|
|
|
|
|
// Intermediate directory does not contain job. Search through older ones.
|
|
|
- metaInfo = scanOldDirsForJob(jobId);
|
|
|
- if (metaInfo != null) {
|
|
|
- return metaInfo;
|
|
|
+ fileInfo = scanOldDirsForJob(jobId);
|
|
|
+ if (fileInfo != null) {
|
|
|
+ return fileInfo;
|
|
|
}
|
|
|
return null;
|
|
|
}
|
|
|
|
|
|
- void moveToDone(MetaInfo metaInfo) throws IOException {
|
|
|
- long completeTime = metaInfo.getJobIndexInfo().getFinishTime();
|
|
|
- if (completeTime == 0)
|
|
|
- completeTime = System.currentTimeMillis();
|
|
|
- JobId jobId = metaInfo.getJobIndexInfo().getJobId();
|
|
|
-
|
|
|
- List<Path> paths = new ArrayList<Path>();
|
|
|
- Path historyFile = metaInfo.getHistoryFile();
|
|
|
- if (historyFile == null) {
|
|
|
- LOG.info("No file for job-history with " + jobId + " found in cache!");
|
|
|
- } else {
|
|
|
- paths.add(historyFile);
|
|
|
- }
|
|
|
-
|
|
|
- Path confFile = metaInfo.getConfFile();
|
|
|
- if (confFile == null) {
|
|
|
- LOG.info("No file for jobConf with " + jobId + " found in cache!");
|
|
|
- } else {
|
|
|
- paths.add(confFile);
|
|
|
- }
|
|
|
-
|
|
|
- // TODO Check all mi getters and setters for the conf path
|
|
|
- Path summaryFile = metaInfo.getSummaryFile();
|
|
|
- if (summaryFile == null) {
|
|
|
- LOG.info("No summary file for job: " + jobId);
|
|
|
- } else {
|
|
|
- try {
|
|
|
- String jobSummaryString = getJobSummary(intermediateDoneDirFc,
|
|
|
- summaryFile);
|
|
|
- SUMMARY_LOG.info(jobSummaryString);
|
|
|
- LOG.info("Deleting JobSummary file: [" + summaryFile + "]");
|
|
|
- intermediateDoneDirFc.delete(summaryFile, false);
|
|
|
- metaInfo.setSummaryFile(null);
|
|
|
- } catch (IOException e) {
|
|
|
- LOG.warn("Failed to process summary file: [" + summaryFile + "]");
|
|
|
- throw e;
|
|
|
- }
|
|
|
- }
|
|
|
-
|
|
|
- Path targetDir = canonicalHistoryLogPath(jobId, completeTime);
|
|
|
- addDirectoryToSerialNumberIndex(targetDir);
|
|
|
- try {
|
|
|
- makeDoneSubdir(targetDir);
|
|
|
- } catch (IOException e) {
|
|
|
- LOG.warn("Failed creating subdirectory: " + targetDir
|
|
|
- + " while attempting to move files for jobId: " + jobId);
|
|
|
- throw e;
|
|
|
- }
|
|
|
- synchronized (metaInfo) {
|
|
|
- if (historyFile != null) {
|
|
|
- Path toPath = doneDirFc.makeQualified(new Path(targetDir, historyFile
|
|
|
- .getName()));
|
|
|
- try {
|
|
|
- moveToDoneNow(historyFile, toPath);
|
|
|
- } catch (IOException e) {
|
|
|
- LOG.warn("Failed to move file: " + historyFile + " for jobId: "
|
|
|
- + jobId);
|
|
|
- throw e;
|
|
|
- }
|
|
|
- metaInfo.setHistoryFile(toPath);
|
|
|
- }
|
|
|
- if (confFile != null) {
|
|
|
- Path toPath = doneDirFc.makeQualified(new Path(targetDir, confFile
|
|
|
- .getName()));
|
|
|
- try {
|
|
|
- moveToDoneNow(confFile, toPath);
|
|
|
- } catch (IOException e) {
|
|
|
- LOG.warn("Failed to move file: " + historyFile + " for jobId: "
|
|
|
- + jobId);
|
|
|
- throw e;
|
|
|
- }
|
|
|
- metaInfo.setConfFile(toPath);
|
|
|
- }
|
|
|
- }
|
|
|
- addToJobListCache(metaInfo);
|
|
|
- intermediateListCache.remove(jobId);
|
|
|
- }
|
|
|
-
|
|
|
private void moveToDoneNow(final Path src, final Path target)
|
|
|
throws IOException {
|
|
|
LOG.info("Moving " + src.toString() + " to " + target.toString());
|
|
@@ -658,20 +751,9 @@ public class HistoryFileManager extends AbstractService {
|
|
|
}
|
|
|
|
|
|
private void makeDoneSubdir(Path path) throws IOException {
|
|
|
- boolean existsInExistingCache = false;
|
|
|
- synchronized (existingDoneSubdirs) {
|
|
|
- if (existingDoneSubdirs.contains(path))
|
|
|
- existsInExistingCache = true;
|
|
|
- }
|
|
|
try {
|
|
|
doneDirFc.getFileStatus(path);
|
|
|
- if (!existsInExistingCache) {
|
|
|
- existingDoneSubdirs.add(path);
|
|
|
- if (LOG.isDebugEnabled()) {
|
|
|
- LOG.debug("JobHistory.maybeMakeSubdirectory -- We believed " + path
|
|
|
- + " already existed, but it didn't.");
|
|
|
- }
|
|
|
- }
|
|
|
+ existingDoneSubdirs.add(path);
|
|
|
} catch (FileNotFoundException fnfE) {
|
|
|
try {
|
|
|
FsPermission fsp = new FsPermission(
|
|
@@ -685,11 +767,8 @@ public class HistoryFileManager extends AbstractService {
|
|
|
+ ", " + fsp);
|
|
|
doneDirFc.setPermission(path, fsp);
|
|
|
}
|
|
|
- synchronized (existingDoneSubdirs) {
|
|
|
- existingDoneSubdirs.add(path);
|
|
|
- }
|
|
|
- } catch (FileAlreadyExistsException faeE) {
|
|
|
- // Nothing to do.
|
|
|
+ existingDoneSubdirs.add(path);
|
|
|
+ } catch (FileAlreadyExistsException faeE) { // Nothing to do.
|
|
|
}
|
|
|
}
|
|
|
}
|
|
@@ -713,16 +792,22 @@ public class HistoryFileManager extends AbstractService {
|
|
|
return finishTime;
|
|
|
}
|
|
|
|
|
|
- private void deleteJobFromDone(MetaInfo metaInfo) throws IOException {
|
|
|
- jobListCache.remove(metaInfo.getJobId());
|
|
|
- doneDirFc.delete(doneDirFc.makeQualified(metaInfo.getHistoryFile()), false);
|
|
|
- doneDirFc.delete(doneDirFc.makeQualified(metaInfo.getConfFile()), false);
|
|
|
+ private void deleteJobFromDone(HistoryFileInfo fileInfo) throws IOException {
|
|
|
+ jobListCache.delete(fileInfo);
|
|
|
+ fileInfo.delete();
|
|
|
}
|
|
|
|
|
|
+ /**
|
|
|
+ * Clean up older history files.
|
|
|
+ *
|
|
|
+ * @throws IOException
|
|
|
+ * on any error trying to remove the entries.
|
|
|
+ */
|
|
|
@SuppressWarnings("unchecked")
|
|
|
- void clean(long cutoff, HistoryStorage storage) throws IOException {
|
|
|
+ void clean() throws IOException {
|
|
|
// TODO this should be replaced by something that knows about the directory
|
|
|
// structure and will put less of a load on HDFS.
|
|
|
+ long cutoff = System.currentTimeMillis() - maxHistoryAge;
|
|
|
boolean halted = false;
|
|
|
// TODO Delete YYYY/MM/DD directories.
|
|
|
List<FileStatus> serialDirList = findTimestampedDirectories();
|
|
@@ -737,13 +822,17 @@ public class HistoryFileManager extends AbstractService {
|
|
|
long effectiveTimestamp = getEffectiveTimestamp(
|
|
|
jobIndexInfo.getFinishTime(), historyFile);
|
|
|
if (effectiveTimestamp <= cutoff) {
|
|
|
- String confFileName = JobHistoryUtils
|
|
|
- .getIntermediateConfFileName(jobIndexInfo.getJobId());
|
|
|
- MetaInfo metaInfo = new MetaInfo(historyFile.getPath(), new Path(
|
|
|
- historyFile.getPath().getParent(), confFileName), null,
|
|
|
- jobIndexInfo);
|
|
|
- storage.jobRemovedFromHDFS(metaInfo.getJobId());
|
|
|
- deleteJobFromDone(metaInfo);
|
|
|
+ HistoryFileInfo fileInfo = this.jobListCache.get(jobIndexInfo
|
|
|
+ .getJobId());
|
|
|
+ if (fileInfo == null) {
|
|
|
+ String confFileName = JobHistoryUtils
|
|
|
+ .getIntermediateConfFileName(jobIndexInfo.getJobId());
|
|
|
+
|
|
|
+ fileInfo = new HistoryFileInfo(historyFile.getPath(), new Path(
|
|
|
+ historyFile.getPath().getParent(), confFileName), null,
|
|
|
+ jobIndexInfo, true);
|
|
|
+ }
|
|
|
+ deleteJobFromDone(fileInfo);
|
|
|
} else {
|
|
|
halted = true;
|
|
|
break;
|
|
@@ -752,9 +841,7 @@ public class HistoryFileManager extends AbstractService {
|
|
|
if (!halted) {
|
|
|
doneDirFc.delete(doneDirFc.makeQualified(serialDir.getPath()), true);
|
|
|
removeDirectoryFromSerialNumberIndex(serialDir.getPath());
|
|
|
- synchronized (existingDoneSubdirs) {
|
|
|
- existingDoneSubdirs.remove(serialDir.getPath());
|
|
|
- }
|
|
|
+ existingDoneSubdirs.remove(serialDir.getPath());
|
|
|
} else {
|
|
|
break; // Don't scan any more directories.
|
|
|
}
|