|
@@ -21,7 +21,6 @@ package org.apache.hadoop.mapreduce.v2.hs;
|
|
import java.io.FileNotFoundException;
|
|
import java.io.FileNotFoundException;
|
|
import java.io.IOException;
|
|
import java.io.IOException;
|
|
import java.util.ArrayList;
|
|
import java.util.ArrayList;
|
|
-import java.util.Calendar;
|
|
|
|
import java.util.Collections;
|
|
import java.util.Collections;
|
|
import java.util.Comparator;
|
|
import java.util.Comparator;
|
|
import java.util.HashMap;
|
|
import java.util.HashMap;
|
|
@@ -36,8 +35,6 @@ import java.util.concurrent.LinkedBlockingQueue;
|
|
import java.util.concurrent.ScheduledThreadPoolExecutor;
|
|
import java.util.concurrent.ScheduledThreadPoolExecutor;
|
|
import java.util.concurrent.ThreadPoolExecutor;
|
|
import java.util.concurrent.ThreadPoolExecutor;
|
|
import java.util.concurrent.TimeUnit;
|
|
import java.util.concurrent.TimeUnit;
|
|
-import java.util.regex.Matcher;
|
|
|
|
-import java.util.regex.Pattern;
|
|
|
|
|
|
|
|
import org.apache.commons.logging.Log;
|
|
import org.apache.commons.logging.Log;
|
|
import org.apache.commons.logging.LogFactory;
|
|
import org.apache.commons.logging.LogFactory;
|
|
@@ -87,18 +84,18 @@ public class JobHistory extends AbstractService implements HistoryContext {
|
|
|
|
|
|
private static final Log SUMMARY_LOG = LogFactory.getLog(JobSummary.class);
|
|
private static final Log SUMMARY_LOG = LogFactory.getLog(JobSummary.class);
|
|
|
|
|
|
- private static final Pattern DATE_PATTERN = Pattern
|
|
|
|
- .compile("([0-1]?[0-9])/([0-3]?[0-9])/((?:2[0-9])[0-9][0-9])");
|
|
|
|
-
|
|
|
|
/*
|
|
/*
|
|
* TODO Get rid of this once JobId has it's own comparator
|
|
* TODO Get rid of this once JobId has it's own comparator
|
|
*/
|
|
*/
|
|
- private static final Comparator<JobId> JOB_ID_COMPARATOR = new Comparator<JobId>() {
|
|
|
|
|
|
+ private static final Comparator<JobId> JOB_ID_COMPARATOR =
|
|
|
|
+ new Comparator<JobId>() {
|
|
@Override
|
|
@Override
|
|
public int compare(JobId o1, JobId o2) {
|
|
public int compare(JobId o1, JobId o2) {
|
|
- if (o1.getAppId().getClusterTimestamp() > o2.getAppId().getClusterTimestamp()) {
|
|
|
|
|
|
+ if (o1.getAppId().getClusterTimestamp() >
|
|
|
|
+ o2.getAppId().getClusterTimestamp()) {
|
|
return 1;
|
|
return 1;
|
|
- } else if (o1.getAppId().getClusterTimestamp() < o2.getAppId().getClusterTimestamp()) {
|
|
|
|
|
|
+ } else if (o1.getAppId().getClusterTimestamp() <
|
|
|
|
+ o2.getAppId().getClusterTimestamp()) {
|
|
return -1;
|
|
return -1;
|
|
} else {
|
|
} else {
|
|
return o1.getId() - o2.getId();
|
|
return o1.getId() - o2.getId();
|
|
@@ -106,7 +103,8 @@ public class JobHistory extends AbstractService implements HistoryContext {
|
|
}
|
|
}
|
|
};
|
|
};
|
|
|
|
|
|
- private static String DONE_BEFORE_SERIAL_TAIL = JobHistoryUtils.doneSubdirsBeforeSerialTail();
|
|
|
|
|
|
+ private static String DONE_BEFORE_SERIAL_TAIL =
|
|
|
|
+ JobHistoryUtils.doneSubdirsBeforeSerialTail();
|
|
|
|
|
|
/**
|
|
/**
|
|
* Maps between a serial number (generated based on jobId) and the timestamp
|
|
* Maps between a serial number (generated based on jobId) and the timestamp
|
|
@@ -114,29 +112,32 @@ public class JobHistory extends AbstractService implements HistoryContext {
|
|
* Facilitates jobId based searches.
|
|
* Facilitates jobId based searches.
|
|
* If a jobId is not found in this list - it will not be found.
|
|
* If a jobId is not found in this list - it will not be found.
|
|
*/
|
|
*/
|
|
- private final SortedMap<String, Set<String>> idToDateString = new ConcurrentSkipListMap<String, Set<String>>();
|
|
|
|
|
|
+ private final SortedMap<String, Set<String>> idToDateString =
|
|
|
|
+ new ConcurrentSkipListMap<String, Set<String>>();
|
|
|
|
|
|
//Maintains minimal details for recent jobs (parsed from history file name).
|
|
//Maintains minimal details for recent jobs (parsed from history file name).
|
|
//Sorted on Job Completion Time.
|
|
//Sorted on Job Completion Time.
|
|
- private final SortedMap<JobId, MetaInfo> jobListCache = new ConcurrentSkipListMap<JobId, MetaInfo>(
|
|
|
|
- JOB_ID_COMPARATOR);
|
|
|
|
|
|
+ private final SortedMap<JobId, MetaInfo> jobListCache =
|
|
|
|
+ new ConcurrentSkipListMap<JobId, MetaInfo>(JOB_ID_COMPARATOR);
|
|
|
|
|
|
|
|
|
|
// Re-use exisiting MetaInfo objects if they exist for the specific JobId. (synchronization on MetaInfo)
|
|
// Re-use exisiting MetaInfo objects if they exist for the specific JobId. (synchronization on MetaInfo)
|
|
// Check for existance of the object when using iterators.
|
|
// Check for existance of the object when using iterators.
|
|
- private final SortedMap<JobId, MetaInfo> intermediateListCache = new ConcurrentSkipListMap<JobId, JobHistory.MetaInfo>(
|
|
|
|
- JOB_ID_COMPARATOR);
|
|
|
|
|
|
+ private final SortedMap<JobId, MetaInfo> intermediateListCache =
|
|
|
|
+ new ConcurrentSkipListMap<JobId, JobHistory.MetaInfo>(JOB_ID_COMPARATOR);
|
|
|
|
|
|
//Maintains a list of known done subdirectories. Not currently used.
|
|
//Maintains a list of known done subdirectories. Not currently used.
|
|
private final Set<Path> existingDoneSubdirs = new HashSet<Path>();
|
|
private final Set<Path> existingDoneSubdirs = new HashSet<Path>();
|
|
|
|
|
|
- private final SortedMap<JobId, Job> loadedJobCache = new ConcurrentSkipListMap<JobId, Job>(
|
|
|
|
- JOB_ID_COMPARATOR);
|
|
|
|
|
|
+ private final SortedMap<JobId, Job> loadedJobCache =
|
|
|
|
+ new ConcurrentSkipListMap<JobId, Job>(JOB_ID_COMPARATOR);
|
|
|
|
|
|
/**
|
|
/**
|
|
- * Maintains a mapping between intermediate user directories and the last known modification time.
|
|
|
|
|
|
+ * Maintains a mapping between intermediate user directories and the last
|
|
|
|
+ * known modification time.
|
|
*/
|
|
*/
|
|
- private Map<String, Long> userDirModificationTimeMap = new HashMap<String, Long>();
|
|
|
|
|
|
+ private Map<String, Long> userDirModificationTimeMap =
|
|
|
|
+ new HashMap<String, Long>();
|
|
|
|
|
|
//The number of jobs to maintain in the job list cache.
|
|
//The number of jobs to maintain in the job list cache.
|
|
private int jobListCacheSize;
|
|
private int jobListCacheSize;
|
|
@@ -187,7 +188,8 @@ public class JobHistory extends AbstractService implements HistoryContext {
|
|
debugMode = conf.getBoolean(JHAdminConfig.MR_HISTORY_DEBUG_MODE, false);
|
|
debugMode = conf.getBoolean(JHAdminConfig.MR_HISTORY_DEBUG_MODE, false);
|
|
serialNumberLowDigits = debugMode ? 1 : 3;
|
|
serialNumberLowDigits = debugMode ? 1 : 3;
|
|
serialNumberFormat = ("%0"
|
|
serialNumberFormat = ("%0"
|
|
- + (JobHistoryUtils.SERIAL_NUMBER_DIRECTORY_DIGITS + serialNumberLowDigits) + "d");
|
|
|
|
|
|
+ + (JobHistoryUtils.SERIAL_NUMBER_DIRECTORY_DIGITS
|
|
|
|
+ + serialNumberLowDigits) + "d");
|
|
|
|
|
|
String doneDirPrefix = null;
|
|
String doneDirPrefix = null;
|
|
doneDirPrefix = JobHistoryUtils.getConfiguredHistoryServerDoneDirPrefix(conf);
|
|
doneDirPrefix = JobHistoryUtils.getConfiguredHistoryServerDoneDirPrefix(conf);
|
|
@@ -195,9 +197,11 @@ public class JobHistory extends AbstractService implements HistoryContext {
|
|
doneDirPrefixPath = FileContext.getFileContext(conf).makeQualified(
|
|
doneDirPrefixPath = FileContext.getFileContext(conf).makeQualified(
|
|
new Path(doneDirPrefix));
|
|
new Path(doneDirPrefix));
|
|
doneDirFc = FileContext.getFileContext(doneDirPrefixPath.toUri(), conf);
|
|
doneDirFc = FileContext.getFileContext(doneDirPrefixPath.toUri(), conf);
|
|
- mkdir(doneDirFc, doneDirPrefixPath, new FsPermission(JobHistoryUtils.HISTORY_DONE_DIR_PERMISSION));
|
|
|
|
|
|
+ mkdir(doneDirFc, doneDirPrefixPath, new FsPermission(
|
|
|
|
+ JobHistoryUtils.HISTORY_DONE_DIR_PERMISSION));
|
|
} catch (IOException e) {
|
|
} catch (IOException e) {
|
|
- throw new YarnException("Error creating done directory: [" + doneDirPrefixPath + "]", e);
|
|
|
|
|
|
+ throw new YarnException("Error creating done directory: [" +
|
|
|
|
+ doneDirPrefixPath + "]", e);
|
|
}
|
|
}
|
|
|
|
|
|
String intermediateDoneDirPrefix = null;
|
|
String intermediateDoneDirPrefix = null;
|
|
@@ -208,21 +212,27 @@ public class JobHistory extends AbstractService implements HistoryContext {
|
|
.makeQualified(new Path(intermediateDoneDirPrefix));
|
|
.makeQualified(new Path(intermediateDoneDirPrefix));
|
|
intermediateDoneDirFc = FileContext.getFileContext(
|
|
intermediateDoneDirFc = FileContext.getFileContext(
|
|
intermediateDoneDirPath.toUri(), conf);
|
|
intermediateDoneDirPath.toUri(), conf);
|
|
- mkdir(intermediateDoneDirFc, intermediateDoneDirPath, new FsPermission(JobHistoryUtils.HISTORY_INTERMEDIATE_DONE_DIR_PERMISSIONS.toShort()));
|
|
|
|
|
|
+ mkdir(intermediateDoneDirFc, intermediateDoneDirPath, new FsPermission(
|
|
|
|
+ JobHistoryUtils.HISTORY_INTERMEDIATE_DONE_DIR_PERMISSIONS.toShort()));
|
|
} catch (IOException e) {
|
|
} catch (IOException e) {
|
|
LOG.info("error creating done directory on dfs " + e);
|
|
LOG.info("error creating done directory on dfs " + e);
|
|
- throw new YarnException("Error creating intermediate done directory: [" + intermediateDoneDirPath + "]", e);
|
|
|
|
|
|
+ throw new YarnException("Error creating intermediate done directory: ["
|
|
|
|
+ + intermediateDoneDirPath + "]", e);
|
|
}
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
|
|
- jobListCacheSize = conf.getInt(JHAdminConfig.MR_HISTORY_JOBLIST_CACHE_SIZE, DEFAULT_JOBLIST_CACHE_SIZE);
|
|
|
|
- loadedJobCacheSize = conf.getInt(JHAdminConfig.MR_HISTORY_LOADED_JOB_CACHE_SIZE, DEFAULT_LOADEDJOB_CACHE_SIZE);
|
|
|
|
- dateStringCacheSize = conf.getInt(JHAdminConfig.MR_HISTORY_DATESTRING_CACHE_SIZE, DEFAULT_DATESTRING_CACHE_SIZE);
|
|
|
|
|
|
+ jobListCacheSize = conf.getInt(JHAdminConfig.MR_HISTORY_JOBLIST_CACHE_SIZE,
|
|
|
|
+ DEFAULT_JOBLIST_CACHE_SIZE);
|
|
|
|
+ loadedJobCacheSize = conf.getInt(JHAdminConfig.MR_HISTORY_LOADED_JOB_CACHE_SIZE,
|
|
|
|
+ DEFAULT_LOADEDJOB_CACHE_SIZE);
|
|
|
|
+ dateStringCacheSize = conf.getInt(JHAdminConfig.MR_HISTORY_DATESTRING_CACHE_SIZE,
|
|
|
|
+ DEFAULT_DATESTRING_CACHE_SIZE);
|
|
moveThreadInterval =
|
|
moveThreadInterval =
|
|
conf.getLong(JHAdminConfig.MR_HISTORY_MOVE_INTERVAL_MS,
|
|
conf.getLong(JHAdminConfig.MR_HISTORY_MOVE_INTERVAL_MS,
|
|
DEFAULT_MOVE_THREAD_INTERVAL);
|
|
DEFAULT_MOVE_THREAD_INTERVAL);
|
|
- numMoveThreads = conf.getInt(JHAdminConfig.MR_HISTORY_MOVE_THREAD_COUNT, DEFAULT_MOVE_THREAD_COUNT);
|
|
|
|
|
|
+ numMoveThreads = conf.getInt(JHAdminConfig.MR_HISTORY_MOVE_THREAD_COUNT,
|
|
|
|
+ DEFAULT_MOVE_THREAD_COUNT);
|
|
try {
|
|
try {
|
|
initExisting();
|
|
initExisting();
|
|
} catch (IOException e) {
|
|
} catch (IOException e) {
|
|
@@ -254,19 +264,21 @@ public class JobHistory extends AbstractService implements HistoryContext {
|
|
@Override
|
|
@Override
|
|
public void start() {
|
|
public void start() {
|
|
//Start moveIntermediatToDoneThread
|
|
//Start moveIntermediatToDoneThread
|
|
- moveIntermediateToDoneRunnable = new MoveIntermediateToDoneRunnable(moveThreadInterval, numMoveThreads);
|
|
|
|
|
|
+ moveIntermediateToDoneRunnable =
|
|
|
|
+ new MoveIntermediateToDoneRunnable(moveThreadInterval, numMoveThreads);
|
|
moveIntermediateToDoneThread = new Thread(moveIntermediateToDoneRunnable);
|
|
moveIntermediateToDoneThread = new Thread(moveIntermediateToDoneRunnable);
|
|
moveIntermediateToDoneThread.setName("MoveIntermediateToDoneScanner");
|
|
moveIntermediateToDoneThread.setName("MoveIntermediateToDoneScanner");
|
|
moveIntermediateToDoneThread.start();
|
|
moveIntermediateToDoneThread.start();
|
|
|
|
|
|
//Start historyCleaner
|
|
//Start historyCleaner
|
|
- boolean startCleanerService = conf.getBoolean(JHAdminConfig.MR_HISTORY_CLEANER_ENABLE, true);
|
|
|
|
|
|
+ boolean startCleanerService = conf.getBoolean(
|
|
|
|
+ JHAdminConfig.MR_HISTORY_CLEANER_ENABLE, true);
|
|
if (startCleanerService) {
|
|
if (startCleanerService) {
|
|
- long maxAgeOfHistoryFiles = conf.getLong(JHAdminConfig.MR_HISTORY_MAX_AGE_MS,
|
|
|
|
- DEFAULT_HISTORY_MAX_AGE);
|
|
|
|
|
|
+ long maxAgeOfHistoryFiles = conf.getLong(
|
|
|
|
+ JHAdminConfig.MR_HISTORY_MAX_AGE_MS, DEFAULT_HISTORY_MAX_AGE);
|
|
cleanerScheduledExecutor = new ScheduledThreadPoolExecutor(1);
|
|
cleanerScheduledExecutor = new ScheduledThreadPoolExecutor(1);
|
|
- long runInterval = conf.getLong(JHAdminConfig.MR_HISTORY_CLEANER_INTERVAL_MS,
|
|
|
|
- DEFAULT_RUN_INTERVAL);
|
|
|
|
|
|
+ long runInterval = conf.getLong(
|
|
|
|
+ JHAdminConfig.MR_HISTORY_CLEANER_INTERVAL_MS, DEFAULT_RUN_INTERVAL);
|
|
cleanerScheduledExecutor
|
|
cleanerScheduledExecutor
|
|
.scheduleAtFixedRate(new HistoryCleaner(maxAgeOfHistoryFiles),
|
|
.scheduleAtFixedRate(new HistoryCleaner(maxAgeOfHistoryFiles),
|
|
30 * 1000l, runInterval, TimeUnit.MILLISECONDS);
|
|
30 * 1000l, runInterval, TimeUnit.MILLISECONDS);
|
|
@@ -331,13 +343,16 @@ public class JobHistory extends AbstractService implements HistoryContext {
|
|
|
|
|
|
private void removeDirectoryFromSerialNumberIndex(Path serialDirPath) {
|
|
private void removeDirectoryFromSerialNumberIndex(Path serialDirPath) {
|
|
String serialPart = serialDirPath.getName();
|
|
String serialPart = serialDirPath.getName();
|
|
- String timeStampPart = JobHistoryUtils.getTimestampPartFromPath(serialDirPath.toString());
|
|
|
|
|
|
+ String timeStampPart =
|
|
|
|
+ JobHistoryUtils.getTimestampPartFromPath(serialDirPath.toString());
|
|
if (timeStampPart == null) {
|
|
if (timeStampPart == null) {
|
|
- LOG.warn("Could not find timestamp portion from path: " + serialDirPath.toString() +". Continuing with next");
|
|
|
|
|
|
+ LOG.warn("Could not find timestamp portion from path: " +
|
|
|
|
+ serialDirPath.toString() +". Continuing with next");
|
|
return;
|
|
return;
|
|
}
|
|
}
|
|
if (serialPart == null) {
|
|
if (serialPart == null) {
|
|
- LOG.warn("Could not find serial portion from path: " + serialDirPath.toString() + ". Continuing with next");
|
|
|
|
|
|
+ LOG.warn("Could not find serial portion from path: " +
|
|
|
|
+ serialDirPath.toString() + ". Continuing with next");
|
|
return;
|
|
return;
|
|
}
|
|
}
|
|
if (idToDateString.containsKey(serialPart)) {
|
|
if (idToDateString.containsKey(serialPart)) {
|
|
@@ -355,13 +370,16 @@ public class JobHistory extends AbstractService implements HistoryContext {
|
|
LOG.debug("Adding "+serialDirPath+" to serial index");
|
|
LOG.debug("Adding "+serialDirPath+" to serial index");
|
|
}
|
|
}
|
|
String serialPart = serialDirPath.getName();
|
|
String serialPart = serialDirPath.getName();
|
|
- String timestampPart = JobHistoryUtils.getTimestampPartFromPath(serialDirPath.toString());
|
|
|
|
|
|
+ String timestampPart =
|
|
|
|
+ JobHistoryUtils.getTimestampPartFromPath(serialDirPath.toString());
|
|
if (timestampPart == null) {
|
|
if (timestampPart == null) {
|
|
- LOG.warn("Could not find timestamp portion from path: " + serialDirPath.toString() +". Continuing with next");
|
|
|
|
|
|
+ LOG.warn("Could not find timestamp portion from path: " +
|
|
|
|
+ serialDirPath.toString() +". Continuing with next");
|
|
return;
|
|
return;
|
|
}
|
|
}
|
|
if (serialPart == null) {
|
|
if (serialPart == null) {
|
|
- LOG.warn("Could not find serial portion from path: " + serialDirPath.toString() + ". Continuing with next");
|
|
|
|
|
|
+ LOG.warn("Could not find serial portion from path: " +
|
|
|
|
+ serialDirPath.toString() + ". Continuing with next");
|
|
}
|
|
}
|
|
addToSerialNumberIndex(serialPart, timestampPart);
|
|
addToSerialNumberIndex(serialPart, timestampPart);
|
|
}
|
|
}
|
|
@@ -400,7 +418,8 @@ public class JobHistory extends AbstractService implements HistoryContext {
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
|
|
- private static List<FileStatus> scanDirectory(Path path, FileContext fc, PathFilter pathFilter) throws IOException {
|
|
|
|
|
|
+ private static List<FileStatus> scanDirectory(Path path, FileContext fc,
|
|
|
|
+ PathFilter pathFilter) throws IOException {
|
|
path = fc.makeQualified(path);
|
|
path = fc.makeQualified(path);
|
|
List<FileStatus> jhStatusList = new ArrayList<FileStatus>();
|
|
List<FileStatus> jhStatusList = new ArrayList<FileStatus>();
|
|
RemoteIterator<FileStatus> fileStatusIter = fc.listStatus(path);
|
|
RemoteIterator<FileStatus> fileStatusIter = fc.listStatus(path);
|
|
@@ -414,7 +433,8 @@ public class JobHistory extends AbstractService implements HistoryContext {
|
|
return jhStatusList;
|
|
return jhStatusList;
|
|
}
|
|
}
|
|
|
|
|
|
- private static List<FileStatus> scanDirectoryForHistoryFiles(Path path, FileContext fc) throws IOException {
|
|
|
|
|
|
+ private static List<FileStatus> scanDirectoryForHistoryFiles(Path path,
|
|
|
|
+ FileContext fc) throws IOException {
|
|
return scanDirectory(path, fc, JobHistoryUtils.getHistoryFileFilter());
|
|
return scanDirectory(path, fc, JobHistoryUtils.getHistoryFileFilter());
|
|
}
|
|
}
|
|
|
|
|
|
@@ -425,7 +445,8 @@ public class JobHistory extends AbstractService implements HistoryContext {
|
|
* @return
|
|
* @return
|
|
*/
|
|
*/
|
|
private List<FileStatus> findTimestampedDirectories() throws IOException {
|
|
private List<FileStatus> findTimestampedDirectories() throws IOException {
|
|
- List<FileStatus> fsList = JobHistoryUtils.localGlobber(doneDirFc, doneDirPrefixPath, DONE_BEFORE_SERIAL_TAIL);
|
|
|
|
|
|
+ List<FileStatus> fsList = JobHistoryUtils.localGlobber(doneDirFc,
|
|
|
|
+ doneDirPrefixPath, DONE_BEFORE_SERIAL_TAIL);
|
|
return fsList;
|
|
return fsList;
|
|
}
|
|
}
|
|
|
|
|
|
@@ -434,7 +455,8 @@ public class JobHistory extends AbstractService implements HistoryContext {
|
|
*/
|
|
*/
|
|
private void addToJobListCache(JobId jobId, MetaInfo metaInfo) {
|
|
private void addToJobListCache(JobId jobId, MetaInfo metaInfo) {
|
|
if(LOG.isDebugEnabled()) {
|
|
if(LOG.isDebugEnabled()) {
|
|
- LOG.debug("Adding "+jobId+" to job list cache with "+metaInfo.getJobIndexInfo());
|
|
|
|
|
|
+ LOG.debug("Adding "+jobId+" to job list cache with "
|
|
|
|
+ +metaInfo.getJobIndexInfo());
|
|
}
|
|
}
|
|
jobListCache.put(jobId, metaInfo);
|
|
jobListCache.put(jobId, metaInfo);
|
|
if (jobListCache.size() > jobListCacheSize) {
|
|
if (jobListCache.size() > jobListCacheSize) {
|
|
@@ -462,14 +484,16 @@ public class JobHistory extends AbstractService implements HistoryContext {
|
|
* @throws IOException
|
|
* @throws IOException
|
|
*/
|
|
*/
|
|
private void scanIntermediateDirectory() throws IOException {
|
|
private void scanIntermediateDirectory() throws IOException {
|
|
- List<FileStatus> userDirList = JobHistoryUtils.localGlobber(intermediateDoneDirFc, intermediateDoneDirPath, "");
|
|
|
|
|
|
+ List<FileStatus> userDirList =
|
|
|
|
+ JobHistoryUtils.localGlobber(intermediateDoneDirFc, intermediateDoneDirPath, "");
|
|
|
|
|
|
for (FileStatus userDir : userDirList) {
|
|
for (FileStatus userDir : userDirList) {
|
|
String name = userDir.getPath().getName();
|
|
String name = userDir.getPath().getName();
|
|
long newModificationTime = userDir.getModificationTime();
|
|
long newModificationTime = userDir.getModificationTime();
|
|
boolean shouldScan = false;
|
|
boolean shouldScan = false;
|
|
synchronized (userDirModificationTimeMap) {
|
|
synchronized (userDirModificationTimeMap) {
|
|
- if (!userDirModificationTimeMap.containsKey(name) || newModificationTime > userDirModificationTimeMap.get(name)) {
|
|
|
|
|
|
+ if (!userDirModificationTimeMap.containsKey(name) || newModificationTime
|
|
|
|
+ > userDirModificationTimeMap.get(name)) {
|
|
shouldScan = true;
|
|
shouldScan = true;
|
|
userDirModificationTimeMap.put(name, newModificationTime);
|
|
userDirModificationTimeMap.put(name, newModificationTime);
|
|
}
|
|
}
|
|
@@ -514,9 +538,11 @@ public class JobHistory extends AbstractService implements HistoryContext {
|
|
* @return A MetaInfo object for the jobId, null if not found.
|
|
* @return A MetaInfo object for the jobId, null if not found.
|
|
* @throws IOException
|
|
* @throws IOException
|
|
*/
|
|
*/
|
|
- private MetaInfo getJobMetaInfo(List<FileStatus> fileStatusList, JobId jobId) throws IOException {
|
|
|
|
|
|
+ private MetaInfo getJobMetaInfo(List<FileStatus> fileStatusList, JobId jobId)
|
|
|
|
+ throws IOException {
|
|
for (FileStatus fs : fileStatusList) {
|
|
for (FileStatus fs : fileStatusList) {
|
|
- JobIndexInfo jobIndexInfo = FileNameIndexUtils.getIndexInfo(fs.getPath().getName());
|
|
|
|
|
|
+ JobIndexInfo jobIndexInfo =
|
|
|
|
+ FileNameIndexUtils.getIndexInfo(fs.getPath().getName());
|
|
if (jobIndexInfo.getJobId().equals(jobId)) {
|
|
if (jobIndexInfo.getJobId().equals(jobId)) {
|
|
String confFileName = JobHistoryUtils
|
|
String confFileName = JobHistoryUtils
|
|
.getIntermediateConfFileName(jobIndexInfo.getJobId());
|
|
.getIntermediateConfFileName(jobIndexInfo.getJobId());
|
|
@@ -549,7 +575,8 @@ public class JobHistory extends AbstractService implements HistoryContext {
|
|
}
|
|
}
|
|
for (String timestampPart : dateStringSet) {
|
|
for (String timestampPart : dateStringSet) {
|
|
Path logDir = canonicalHistoryLogPath(jobId, timestampPart);
|
|
Path logDir = canonicalHistoryLogPath(jobId, timestampPart);
|
|
- List<FileStatus> fileStatusList = scanDirectoryForHistoryFiles(logDir, doneDirFc);
|
|
|
|
|
|
+ List<FileStatus> fileStatusList = scanDirectoryForHistoryFiles(logDir,
|
|
|
|
+ doneDirFc);
|
|
MetaInfo metaInfo = getJobMetaInfo(fileStatusList, jobId);
|
|
MetaInfo metaInfo = getJobMetaInfo(fileStatusList, jobId);
|
|
if (metaInfo != null) {
|
|
if (metaInfo != null) {
|
|
return metaInfo;
|
|
return metaInfo;
|
|
@@ -559,7 +586,8 @@ public class JobHistory extends AbstractService implements HistoryContext {
|
|
}
|
|
}
|
|
|
|
|
|
/**
|
|
/**
|
|
- * Checks for the existence of the job history file in the interemediate directory.
|
|
|
|
|
|
+ * Checks for the existence of the job history file in the intermediate
|
|
|
|
+ * directory.
|
|
* @param jobId
|
|
* @param jobId
|
|
* @return
|
|
* @return
|
|
* @throws IOException
|
|
* @throws IOException
|
|
@@ -586,7 +614,8 @@ public class JobHistory extends AbstractService implements HistoryContext {
|
|
|
|
|
|
MoveIntermediateToDoneRunnable(long sleepTime, int numMoveThreads) {
|
|
MoveIntermediateToDoneRunnable(long sleepTime, int numMoveThreads) {
|
|
this.sleepTime = sleepTime;
|
|
this.sleepTime = sleepTime;
|
|
- moveToDoneExecutor = new ThreadPoolExecutor(1, numMoveThreads, 1, TimeUnit.HOURS, new LinkedBlockingQueue<Runnable>());
|
|
|
|
|
|
+ moveToDoneExecutor = new ThreadPoolExecutor(1, numMoveThreads, 1,
|
|
|
|
+ TimeUnit.HOURS, new LinkedBlockingQueue<Runnable>());
|
|
running = true;
|
|
running = true;
|
|
}
|
|
}
|
|
|
|
|
|
@@ -604,7 +633,8 @@ public class JobHistory extends AbstractService implements HistoryContext {
|
|
try {
|
|
try {
|
|
moveToDone(metaInfo);
|
|
moveToDone(metaInfo);
|
|
} catch (IOException e) {
|
|
} catch (IOException e) {
|
|
- LOG.info("Failed to process metaInfo for job: " + metaInfo.jobIndexInfo.getJobId(), e);
|
|
|
|
|
|
+ LOG.info("Failed to process metaInfo for job: " +
|
|
|
|
+ metaInfo.jobIndexInfo.getJobId(), e);
|
|
}
|
|
}
|
|
}
|
|
}
|
|
});
|
|
});
|
|
@@ -629,38 +659,17 @@ public class JobHistory extends AbstractService implements HistoryContext {
|
|
synchronized(metaInfo) {
|
|
synchronized(metaInfo) {
|
|
try {
|
|
try {
|
|
Job job = new CompletedJob(conf, metaInfo.getJobIndexInfo().getJobId(),
|
|
Job job = new CompletedJob(conf, metaInfo.getJobIndexInfo().getJobId(),
|
|
- metaInfo.getHistoryFile(), true, metaInfo.getJobIndexInfo().getUser());
|
|
|
|
|
|
+ metaInfo.getHistoryFile(), true, metaInfo.getJobIndexInfo().getUser(),
|
|
|
|
+ metaInfo.getConfFile());
|
|
addToLoadedJobCache(job);
|
|
addToLoadedJobCache(job);
|
|
return job;
|
|
return job;
|
|
} catch (IOException e) {
|
|
} catch (IOException e) {
|
|
- throw new YarnException("Could not find/load job: " + metaInfo.getJobIndexInfo().getJobId(), e);
|
|
|
|
|
|
+ throw new YarnException("Could not find/load job: " +
|
|
|
|
+ metaInfo.getJobIndexInfo().getJobId(), e);
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
|
|
- private SortedMap<JobId, JobIndexInfo> getAllJobsMetaInfo() {
|
|
|
|
- SortedMap<JobId, JobIndexInfo> result = new TreeMap<JobId, JobIndexInfo>(JOB_ID_COMPARATOR);
|
|
|
|
- try {
|
|
|
|
- scanIntermediateDirectory();
|
|
|
|
- } catch (IOException e) {
|
|
|
|
- LOG.warn("Failed to scan intermediate directory", e);
|
|
|
|
- throw new YarnException(e);
|
|
|
|
- }
|
|
|
|
- for (JobId jobId : intermediateListCache.keySet()) {
|
|
|
|
- MetaInfo mi = intermediateListCache.get(jobId);
|
|
|
|
- if (mi != null) {
|
|
|
|
- result.put(jobId, mi.getJobIndexInfo());
|
|
|
|
- }
|
|
|
|
- }
|
|
|
|
- for (JobId jobId : jobListCache.keySet()) {
|
|
|
|
- MetaInfo mi = jobListCache.get(jobId);
|
|
|
|
- if (mi != null) {
|
|
|
|
- result.put(jobId, mi.getJobIndexInfo());
|
|
|
|
- }
|
|
|
|
- }
|
|
|
|
- return result;
|
|
|
|
- }
|
|
|
|
-
|
|
|
|
private Map<JobId, Job> getAllJobsInternal() {
|
|
private Map<JobId, Job> getAllJobsInternal() {
|
|
//TODO This should ideally be using getAllJobsMetaInfo
|
|
//TODO This should ideally be using getAllJobsMetaInfo
|
|
// or get rid of that method once Job has APIs for user, finishTime etc.
|
|
// or get rid of that method once Job has APIs for user, finishTime etc.
|
|
@@ -746,108 +755,6 @@ public class JobHistory extends AbstractService implements HistoryContext {
|
|
return null;
|
|
return null;
|
|
}
|
|
}
|
|
|
|
|
|
- /**
|
|
|
|
- * Searches cached jobs for the specified criteria (AND). Ignores the criteria if null.
|
|
|
|
- * @param soughtUser
|
|
|
|
- * @param soughtJobNameSubstring
|
|
|
|
- * @param soughtDateStrings
|
|
|
|
- * @return
|
|
|
|
- */
|
|
|
|
- private Map<JobId, Job> findJobs(String soughtUser, String soughtJobNameSubstring, String[] soughtDateStrings) {
|
|
|
|
- boolean searchUser = true;
|
|
|
|
- boolean searchJobName = true;
|
|
|
|
- boolean searchDates = true;
|
|
|
|
- List<Calendar> soughtCalendars = null;
|
|
|
|
-
|
|
|
|
- if (soughtUser == null) {
|
|
|
|
- searchUser = false;
|
|
|
|
- }
|
|
|
|
- if (soughtJobNameSubstring == null) {
|
|
|
|
- searchJobName = false;
|
|
|
|
- }
|
|
|
|
- if (soughtDateStrings == null) {
|
|
|
|
- searchDates = false;
|
|
|
|
- } else {
|
|
|
|
- soughtCalendars = getSoughtDateAsCalendar(soughtDateStrings);
|
|
|
|
- }
|
|
|
|
-
|
|
|
|
- Map<JobId, Job> resultMap = new TreeMap<JobId, Job>();
|
|
|
|
-
|
|
|
|
- SortedMap<JobId, JobIndexInfo> allJobs = getAllJobsMetaInfo();
|
|
|
|
- for (Map.Entry<JobId, JobIndexInfo> entry : allJobs.entrySet()) {
|
|
|
|
- JobId jobId = entry.getKey();
|
|
|
|
- JobIndexInfo indexInfo = entry.getValue();
|
|
|
|
- String jobName = indexInfo.getJobName();
|
|
|
|
- String jobUser = indexInfo.getUser();
|
|
|
|
- long finishTime = indexInfo.getFinishTime();
|
|
|
|
-
|
|
|
|
- if (searchUser) {
|
|
|
|
- if (!soughtUser.equals(jobUser)) {
|
|
|
|
- continue;
|
|
|
|
- }
|
|
|
|
- }
|
|
|
|
-
|
|
|
|
- if (searchJobName) {
|
|
|
|
- if (!jobName.contains(soughtJobNameSubstring)) {
|
|
|
|
- continue;
|
|
|
|
- }
|
|
|
|
- }
|
|
|
|
-
|
|
|
|
- if (searchDates) {
|
|
|
|
- boolean matchedDate = false;
|
|
|
|
- Calendar jobCal = Calendar.getInstance();
|
|
|
|
- jobCal.setTimeInMillis(finishTime);
|
|
|
|
- for (Calendar cal : soughtCalendars) {
|
|
|
|
- if (jobCal.get(Calendar.YEAR) == cal.get(Calendar.YEAR) &&
|
|
|
|
- jobCal.get(Calendar.MONTH) == cal.get(Calendar.MONTH) &&
|
|
|
|
- jobCal.get(Calendar.DAY_OF_MONTH) == cal.get(Calendar.DAY_OF_MONTH)) {
|
|
|
|
- matchedDate = true;
|
|
|
|
- break;
|
|
|
|
- }
|
|
|
|
- }
|
|
|
|
- if (!matchedDate) {
|
|
|
|
- break;
|
|
|
|
- }
|
|
|
|
- }
|
|
|
|
- resultMap.put(jobId, new PartialJob(indexInfo, jobId));
|
|
|
|
- }
|
|
|
|
- return resultMap;
|
|
|
|
- }
|
|
|
|
-
|
|
|
|
- private List<Calendar> getSoughtDateAsCalendar(String [] soughtDateStrings) {
|
|
|
|
- List<Calendar> soughtCalendars = new ArrayList<Calendar>();
|
|
|
|
- for (int i = 0 ; i < soughtDateStrings.length ; i++) {
|
|
|
|
- String soughtDate = soughtDateStrings[i];
|
|
|
|
- if (soughtDate.length() != 0) {
|
|
|
|
- Matcher m = DATE_PATTERN.matcher(soughtDate);
|
|
|
|
- if (m.matches()) {
|
|
|
|
- String yyyyPart = m.group(3);
|
|
|
|
- String mmPart = m.group(1);
|
|
|
|
- String ddPart = m.group(2);
|
|
|
|
-
|
|
|
|
- if (yyyyPart.length() == 2) {
|
|
|
|
- yyyyPart = "20" + yyyyPart;
|
|
|
|
- }
|
|
|
|
- if (mmPart.length() == 1) {
|
|
|
|
- mmPart = "0" + mmPart;
|
|
|
|
- }
|
|
|
|
- if (ddPart.length() == 1) {
|
|
|
|
- ddPart = "0" + ddPart;
|
|
|
|
- }
|
|
|
|
- Calendar soughtCal = Calendar.getInstance();
|
|
|
|
- soughtCal.set(Calendar.YEAR, Integer.parseInt(yyyyPart));
|
|
|
|
- soughtCal.set(Calendar.MONTH, Integer.parseInt(mmPart) - 1);
|
|
|
|
- soughtCal.set(Calendar.DAY_OF_MONTH, Integer.parseInt(ddPart) -1);
|
|
|
|
- soughtCalendars.add(soughtCal);
|
|
|
|
- }
|
|
|
|
- }
|
|
|
|
- }
|
|
|
|
- return soughtCalendars;
|
|
|
|
- }
|
|
|
|
-
|
|
|
|
-
|
|
|
|
-
|
|
|
|
-
|
|
|
|
private void moveToDone(MetaInfo metaInfo) throws IOException {
|
|
private void moveToDone(MetaInfo metaInfo) throws IOException {
|
|
long completeTime = metaInfo.getJobIndexInfo().getFinishTime();
|
|
long completeTime = metaInfo.getJobIndexInfo().getFinishTime();
|
|
if (completeTime == 0) completeTime = System.currentTimeMillis();
|
|
if (completeTime == 0) completeTime = System.currentTimeMillis();
|
|
@@ -890,26 +797,31 @@ public class JobHistory extends AbstractService implements HistoryContext {
|
|
try {
|
|
try {
|
|
maybeMakeSubdirectory(targetDir);
|
|
maybeMakeSubdirectory(targetDir);
|
|
} catch (IOException e) {
|
|
} catch (IOException e) {
|
|
- LOG.warn("Failed creating subdirectory: " + targetDir + " while attempting to move files for jobId: " + jobId);
|
|
|
|
|
|
+ LOG.warn("Failed creating subdirectory: " + targetDir +
|
|
|
|
+ " while attempting to move files for jobId: " + jobId);
|
|
throw e;
|
|
throw e;
|
|
}
|
|
}
|
|
synchronized (metaInfo) {
|
|
synchronized (metaInfo) {
|
|
if (historyFile != null) {
|
|
if (historyFile != null) {
|
|
- Path toPath = doneDirFc.makeQualified(new Path(targetDir, historyFile.getName()));
|
|
|
|
|
|
+ Path toPath = doneDirFc.makeQualified(new Path(targetDir,
|
|
|
|
+ historyFile.getName()));
|
|
try {
|
|
try {
|
|
moveToDoneNow(historyFile, toPath);
|
|
moveToDoneNow(historyFile, toPath);
|
|
} catch (IOException e) {
|
|
} catch (IOException e) {
|
|
- LOG.warn("Failed to move file: " + historyFile + " for jobId: " + jobId);
|
|
|
|
|
|
+ LOG.warn("Failed to move file: " + historyFile + " for jobId: "
|
|
|
|
+ + jobId);
|
|
throw e;
|
|
throw e;
|
|
}
|
|
}
|
|
metaInfo.setHistoryFile(toPath);
|
|
metaInfo.setHistoryFile(toPath);
|
|
}
|
|
}
|
|
if (confFile != null) {
|
|
if (confFile != null) {
|
|
- Path toPath = doneDirFc.makeQualified(new Path(targetDir, confFile.getName()));
|
|
|
|
|
|
+ Path toPath = doneDirFc.makeQualified(new Path(targetDir,
|
|
|
|
+ confFile.getName()));
|
|
try {
|
|
try {
|
|
moveToDoneNow(confFile, toPath);
|
|
moveToDoneNow(confFile, toPath);
|
|
} catch (IOException e) {
|
|
} catch (IOException e) {
|
|
- LOG.warn("Failed to move file: " + historyFile + " for jobId: " + jobId);
|
|
|
|
|
|
+ LOG.warn("Failed to move file: " + historyFile + " for jobId: "
|
|
|
|
+ + jobId);
|
|
throw e;
|
|
throw e;
|
|
}
|
|
}
|
|
metaInfo.setConfFile(toPath);
|
|
metaInfo.setConfFile(toPath);
|
|
@@ -953,7 +865,8 @@ public class JobHistory extends AbstractService implements HistoryContext {
|
|
}
|
|
}
|
|
} catch (FileNotFoundException fnfE) {
|
|
} catch (FileNotFoundException fnfE) {
|
|
try {
|
|
try {
|
|
- FsPermission fsp = new FsPermission(JobHistoryUtils.HISTORY_DONE_DIR_PERMISSION);
|
|
|
|
|
|
+ FsPermission fsp =
|
|
|
|
+ new FsPermission(JobHistoryUtils.HISTORY_DONE_DIR_PERMISSION);
|
|
doneDirFc.mkdir(path, fsp, true);
|
|
doneDirFc.mkdir(path, fsp, true);
|
|
FileStatus fsStatus = doneDirFc.getFileStatus(path);
|
|
FileStatus fsStatus = doneDirFc.getFileStatus(path);
|
|
LOG.info("Perms after creating " + fsStatus.getPermission().toShort()
|
|
LOG.info("Perms after creating " + fsStatus.getPermission().toShort()
|
|
@@ -972,12 +885,15 @@ public class JobHistory extends AbstractService implements HistoryContext {
|
|
}
|
|
}
|
|
|
|
|
|
private Path canonicalHistoryLogPath(JobId id, String timestampComponent) {
|
|
private Path canonicalHistoryLogPath(JobId id, String timestampComponent) {
|
|
- return new Path(doneDirPrefixPath, JobHistoryUtils.historyLogSubdirectory(id, timestampComponent, serialNumberFormat));
|
|
|
|
|
|
+ return new Path(doneDirPrefixPath,
|
|
|
|
+ JobHistoryUtils.historyLogSubdirectory(id, timestampComponent, serialNumberFormat));
|
|
}
|
|
}
|
|
|
|
|
|
private Path canonicalHistoryLogPath(JobId id, long millisecondTime) {
|
|
private Path canonicalHistoryLogPath(JobId id, long millisecondTime) {
|
|
- String timestampComponent = JobHistoryUtils.timestampDirectoryComponent(millisecondTime, debugMode);
|
|
|
|
- return new Path(doneDirPrefixPath, JobHistoryUtils.historyLogSubdirectory(id, timestampComponent, serialNumberFormat));
|
|
|
|
|
|
+ String timestampComponent =
|
|
|
|
+ JobHistoryUtils.timestampDirectoryComponent(millisecondTime, debugMode);
|
|
|
|
+ return new Path(doneDirPrefixPath,
|
|
|
|
+ JobHistoryUtils.historyLogSubdirectory(id, timestampComponent, serialNumberFormat));
|
|
}
|
|
}
|
|
|
|
|
|
|
|
|
|
@@ -1033,12 +949,13 @@ public class JobHistory extends AbstractService implements HistoryContext {
|
|
private Path summaryFile;
|
|
private Path summaryFile;
|
|
JobIndexInfo jobIndexInfo;
|
|
JobIndexInfo jobIndexInfo;
|
|
|
|
|
|
- MetaInfo(Path historyFile, Path confFile, Path summaryFile, JobIndexInfo jobIndexInfo) {
|
|
|
|
|
|
+ MetaInfo(Path historyFile, Path confFile, Path summaryFile,
|
|
|
|
+ JobIndexInfo jobIndexInfo) {
|
|
this.historyFile = historyFile;
|
|
this.historyFile = historyFile;
|
|
this.confFile = confFile;
|
|
this.confFile = confFile;
|
|
this.summaryFile = summaryFile;
|
|
this.summaryFile = summaryFile;
|
|
this.jobIndexInfo = jobIndexInfo;
|
|
this.jobIndexInfo = jobIndexInfo;
|
|
- }
|
|
|
|
|
|
+ }
|
|
|
|
|
|
Path getHistoryFile() { return historyFile; }
|
|
Path getHistoryFile() { return historyFile; }
|
|
Path getConfFile() { return confFile; }
|
|
Path getConfFile() { return confFile; }
|
|
@@ -1073,13 +990,19 @@ public class JobHistory extends AbstractService implements HistoryContext {
|
|
//Sort in ascending order. Relies on YYYY/MM/DD/Serial
|
|
//Sort in ascending order. Relies on YYYY/MM/DD/Serial
|
|
Collections.sort(serialDirList);
|
|
Collections.sort(serialDirList);
|
|
for (FileStatus serialDir : serialDirList) {
|
|
for (FileStatus serialDir : serialDirList) {
|
|
- List<FileStatus> historyFileList = scanDirectoryForHistoryFiles(serialDir.getPath(), doneDirFc);
|
|
|
|
|
|
+ List<FileStatus> historyFileList =
|
|
|
|
+ scanDirectoryForHistoryFiles(serialDir.getPath(), doneDirFc);
|
|
for (FileStatus historyFile : historyFileList) {
|
|
for (FileStatus historyFile : historyFileList) {
|
|
- JobIndexInfo jobIndexInfo = FileNameIndexUtils.getIndexInfo(historyFile.getPath().getName());
|
|
|
|
- long effectiveTimestamp = getEffectiveTimestamp(jobIndexInfo.getFinishTime(), historyFile);
|
|
|
|
|
|
+ JobIndexInfo jobIndexInfo =
|
|
|
|
+ FileNameIndexUtils.getIndexInfo(historyFile.getPath().getName());
|
|
|
|
+ long effectiveTimestamp =
|
|
|
|
+ getEffectiveTimestamp(jobIndexInfo.getFinishTime(), historyFile);
|
|
if (shouldDelete(effectiveTimestamp)) {
|
|
if (shouldDelete(effectiveTimestamp)) {
|
|
- String confFileName = JobHistoryUtils.getIntermediateConfFileName(jobIndexInfo.getJobId());
|
|
|
|
- MetaInfo metaInfo = new MetaInfo(historyFile.getPath(), new Path(historyFile.getPath().getParent(), confFileName), null, jobIndexInfo);
|
|
|
|
|
|
+ String confFileName =
|
|
|
|
+ JobHistoryUtils.getIntermediateConfFileName(jobIndexInfo.getJobId());
|
|
|
|
+ MetaInfo metaInfo = new MetaInfo(historyFile.getPath(),
|
|
|
|
+ new Path(historyFile.getPath().getParent(), confFileName),
|
|
|
|
+ null, jobIndexInfo);
|
|
delete(metaInfo);
|
|
delete(metaInfo);
|
|
} else {
|
|
} else {
|
|
halted = true;
|
|
halted = true;
|