|
@@ -28,14 +28,19 @@ import java.io.UnsupportedEncodingException;
|
|
|
import java.net.URLDecoder;
|
|
|
import java.net.URLEncoder;
|
|
|
import java.util.ArrayList;
|
|
|
+import java.util.Calendar;
|
|
|
import java.util.Collections;
|
|
|
import java.util.HashMap;
|
|
|
+import java.util.HashSet;
|
|
|
import java.util.Iterator;
|
|
|
import java.util.LinkedHashMap;
|
|
|
import java.util.List;
|
|
|
import java.util.Map;
|
|
|
+import java.util.Set;
|
|
|
+import java.util.SortedMap;
|
|
|
import java.util.TreeMap;
|
|
|
import java.util.Map.Entry;
|
|
|
+import java.util.concurrent.atomic.AtomicBoolean;
|
|
|
import java.util.concurrent.ConcurrentHashMap;
|
|
|
import java.util.concurrent.LinkedBlockingQueue;
|
|
|
import java.util.concurrent.ThreadPoolExecutor;
|
|
@@ -50,6 +55,7 @@ import org.apache.hadoop.fs.FSDataInputStream;
|
|
|
import org.apache.hadoop.fs.FSDataOutputStream;
|
|
|
import org.apache.hadoop.fs.FileStatus;
|
|
|
import org.apache.hadoop.fs.FileSystem;
|
|
|
+import org.apache.hadoop.fs.FileUtil;
|
|
|
import org.apache.hadoop.fs.Path;
|
|
|
import org.apache.hadoop.fs.PathFilter;
|
|
|
import org.apache.hadoop.fs.permission.FsPermission;
|
|
@@ -98,6 +104,8 @@ public class JobHistory {
|
|
|
static final String VALUE = "[^\"\\\\]*+(?:\\\\.[^\"\\\\]*+)*+";
|
|
|
|
|
|
static final Pattern pattern = Pattern.compile(KEY + "=" + "\"" + VALUE + "\"");
|
|
|
+
|
|
|
+ static final int MAXIMUM_DATESTRING_COUNT = 200000;
|
|
|
|
|
|
public static final int JOB_NAME_TRIM_LENGTH = 50;
|
|
|
private static String JOBTRACKER_UNIQUE_STRING = null;
|
|
@@ -114,20 +122,44 @@ public class JobHistory {
|
|
|
private static FileSystem DONEDIR_FS; // Done dir filesystem
|
|
|
private static JobConf jtConf;
|
|
|
private static Path DONE = null; // folder for completed jobs
|
|
|
+ private static String DONE_BEFORE_SERIAL_TAIL = doneSubdirsBeforeSerialTail();
|
|
|
+ private static String DONE_LEAF_FILES = DONE_BEFORE_SERIAL_TAIL + "/*";
|
|
|
private static boolean aclsEnabled = false;
|
|
|
+
|
|
|
+ static final String CONF_FILE_NAME_SUFFIX = "_conf.xml";
|
|
|
+
|
|
|
+ // XXXXX debug mode -- set this to false for production
|
|
|
+ private static final boolean DEBUG_MODE = true;
|
|
|
+
|
|
|
+ private static final int SERIAL_NUMBER_DIRECTORY_DIGITS = 6;
|
|
|
+ private static final int SERIAL_NUMBER_LOW_DIGITS = DEBUG_MODE ? 1 : 3;
|
|
|
+
|
|
|
+ private static final String SERIAL_NUMBER_FORMAT
|
|
|
+ = ("%0"
|
|
|
+ + (SERIAL_NUMBER_DIRECTORY_DIGITS + SERIAL_NUMBER_LOW_DIGITS)
|
|
|
+ + "d");
|
|
|
+
|
|
|
+ private static final Set<Path> existingDoneSubdirs = new HashSet<Path>();
|
|
|
+
|
|
|
+ private static final SortedMap<Integer, String> idToDateString
|
|
|
+ = new TreeMap<Integer, String>();
|
|
|
+
|
|
|
/**
|
|
|
* A filter for conf files
|
|
|
*/
|
|
|
private static final PathFilter CONF_FILTER = new PathFilter() {
|
|
|
public boolean accept(Path path) {
|
|
|
- return path.getName().endsWith("_conf.xml");
|
|
|
+ return path.getName().endsWith(CONF_FILE_NAME_SUFFIX);
|
|
|
}
|
|
|
};
|
|
|
|
|
|
- private static Map<JobID, MovedFileInfo> jobHistoryFileMap =
|
|
|
+ private static final Map<JobID, MovedFileInfo> jobHistoryFileMap =
|
|
|
Collections.<JobID,MovedFileInfo>synchronizedMap(
|
|
|
new LinkedHashMap<JobID, MovedFileInfo>());
|
|
|
|
|
|
+ private static final SortedMap<Long, String>jobToDirectoryMap
|
|
|
+ = new TreeMap<Long, String>();
|
|
|
+
|
|
|
private static class MovedFileInfo {
|
|
|
private final String historyFile;
|
|
|
private final long timestamp;
|
|
@@ -244,29 +276,35 @@ public class JobHistory {
|
|
|
executor.execute(new Runnable() {
|
|
|
|
|
|
public void run() {
|
|
|
- //move the files to DONE folder
|
|
|
+ long millisecondTime = System.currentTimeMillis();
|
|
|
+
|
|
|
+ Path resultDir = canonicalHistoryLogPath(id, millisecondTime);
|
|
|
+
|
|
|
+ //move the files to DONE canonical subfolder
|
|
|
try {
|
|
|
for (Path path : paths) {
|
|
|
//check if path exists, in case of retries it may not exist
|
|
|
if (LOGDIR_FS.exists(path)) {
|
|
|
+ maybeMakeSubdirectory(id, millisecondTime);
|
|
|
+
|
|
|
LOG.info("Moving " + path.toString() + " to " +
|
|
|
- DONE.toString());
|
|
|
- DONEDIR_FS.moveFromLocalFile(path, DONE);
|
|
|
- DONEDIR_FS.setPermission(new Path(DONE, path.getName()),
|
|
|
+ resultDir.toString());
|
|
|
+ DONEDIR_FS.moveFromLocalFile(path, resultDir);
|
|
|
+ DONEDIR_FS.setPermission(new Path(resultDir, path.getName()),
|
|
|
new FsPermission(HISTORY_FILE_PERMISSION));
|
|
|
}
|
|
|
}
|
|
|
} catch (Throwable e) {
|
|
|
- LOG.error("Unable to move history file to DONE folder.", e);
|
|
|
+ LOG.error("Unable to move history file to DONE canonical subfolder.", e);
|
|
|
}
|
|
|
String historyFileDonePath = null;
|
|
|
if (historyFile != null) {
|
|
|
- historyFileDonePath = new Path(DONE,
|
|
|
+ historyFileDonePath = new Path(resultDir,
|
|
|
historyFile.getName()).toString();
|
|
|
}
|
|
|
|
|
|
jobHistoryFileMap.put(id, new MovedFileInfo(historyFileDonePath,
|
|
|
- System.currentTimeMillis()));
|
|
|
+ millisecondTime));
|
|
|
jobTracker.historyFileCopied(id, historyFileDonePath);
|
|
|
|
|
|
//purge the job from the cache
|
|
@@ -280,6 +318,123 @@ public class JobHistory {
|
|
|
fileManager.getWriters(jobId).remove(writer);
|
|
|
}
|
|
|
}
|
|
|
+
|
|
|
+ // several methods for manipulating the subdirectories of the DONE
|
|
|
+ // directory
|
|
|
+
|
|
|
+ private static int jobSerialNumber(JobID id) {
|
|
|
+ return id.getId();
|
|
|
+ }
|
|
|
+
|
|
|
+ private static String serialNumberDirectoryComponent(JobID id) {
|
|
|
+ return String.format(SERIAL_NUMBER_FORMAT,
|
|
|
+ new Integer(jobSerialNumber(id)))
|
|
|
+ .substring(0, SERIAL_NUMBER_DIRECTORY_DIGITS);
|
|
|
+ }
|
|
|
+
|
|
|
+ // directory components may contain internal slashes, but do NOT
|
|
|
+ // contain slashes at either end.
|
|
|
+
|
|
|
+ private static String timestampDirectoryComponent(JobID id, long millisecondTime) {
|
|
|
+ int serialNumber = jobSerialNumber(id);
|
|
|
+ Integer boxedSerialNumber = serialNumber;
|
|
|
+
|
|
|
+ // don't want to do this inside the lock
|
|
|
+ Calendar timestamp = Calendar.getInstance();
|
|
|
+ timestamp.setTimeInMillis(millisecondTime);
|
|
|
+
|
|
|
+ synchronized (idToDateString) {
|
|
|
+ String dateString = idToDateString.get(boxedSerialNumber);
|
|
|
+
|
|
|
+ if (dateString == null) {
|
|
|
+
|
|
|
+ dateString = String.format
|
|
|
+ ("%04d/%02d/%02d",
|
|
|
+ timestamp.get(Calendar.YEAR),
|
|
|
+ timestamp.get(DEBUG_MODE ? Calendar.HOUR : Calendar.MONTH),
|
|
|
+ timestamp.get(DEBUG_MODE ? Calendar.MINUTE : Calendar.DAY_OF_MONTH));
|
|
|
+
|
|
|
+ dateString = dateString.intern();
|
|
|
+
|
|
|
+ idToDateString.put(boxedSerialNumber, dateString);
|
|
|
+
|
|
|
+ if (idToDateString.size() > MAXIMUM_DATESTRING_COUNT) {
|
|
|
+ idToDateString.remove(idToDateString.firstKey());
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ return dateString;
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ // returns false iff the directory already existed
|
|
|
+ private static boolean maybeMakeSubdirectory(JobID id, long millisecondTime)
|
|
|
+ throws IOException {
|
|
|
+ Path dir = canonicalHistoryLogPath(id, millisecondTime);
|
|
|
+
|
|
|
+ synchronized (existingDoneSubdirs) {
|
|
|
+ if (existingDoneSubdirs.contains(dir)) {
|
|
|
+ if (DEBUG_MODE && !DONEDIR_FS.exists(dir)) {
|
|
|
+ System.err.println("JobHistory.maybeMakeSubdirectory -- We believed "
|
|
|
+ + dir + " already existed, but it didn't.");
|
|
|
+ }
|
|
|
+
|
|
|
+ return true;
|
|
|
+ }
|
|
|
+
|
|
|
+ if (!DONEDIR_FS.exists(dir)) {
|
|
|
+ LOG.info("Creating DONE subfolder at "+ dir);
|
|
|
+
|
|
|
+ if (!DONEDIR_FS.mkdirs(dir,
|
|
|
+ new FsPermission(HISTORY_DIR_PERMISSION))) {
|
|
|
+ throw new IOException("Mkdirs failed to create " + dir.toString());
|
|
|
+ }
|
|
|
+
|
|
|
+ existingDoneSubdirs.add(dir);
|
|
|
+
|
|
|
+ return false;
|
|
|
+ } else {
|
|
|
+ if (DEBUG_MODE) {
|
|
|
+ System.err.println("JobHistory.maybeMakeSubdirectory -- We believed "
|
|
|
+ + dir + " didn't already exist, but it did.");
|
|
|
+ }
|
|
|
+
|
|
|
+ return false;
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ private static Path canonicalHistoryLogPath(JobID id, long millisecondTime) {
|
|
|
+ return new Path(DONE, historyLogSubdirectory(id, millisecondTime));
|
|
|
+ }
|
|
|
+
|
|
|
+ private static String historyLogSubdirectory(JobID id, long millisecondTime) {
|
|
|
+ String result = jobtrackerDirectoryComponent(id);
|
|
|
+
|
|
|
+ String serialNumberDirectory = serialNumberDirectoryComponent(id);
|
|
|
+
|
|
|
+ result = (result
|
|
|
+ + "/" + timestampDirectoryComponent(id, millisecondTime)
|
|
|
+ + "/" + serialNumberDirectory
|
|
|
+ + "/");
|
|
|
+
|
|
|
+ return result;
|
|
|
+ }
|
|
|
+
|
|
|
+ private static String jobtrackerDirectoryComponent(JobID id) {
|
|
|
+ return JOBTRACKER_UNIQUE_STRING;
|
|
|
+ }
|
|
|
+
|
|
|
+ private static String doneSubdirsBeforeSerialTail() {
|
|
|
+ // job tracker ID
|
|
|
+ String result = "/*"; // job tracker instance ID
|
|
|
+
|
|
|
+ // date
|
|
|
+ result = result + "/*/*/*"; // YYYY/MM/DD ;
|
|
|
+
|
|
|
+ return result;
|
|
|
+ }
|
|
|
+
|
|
|
/**
|
|
|
* Record types are identifiers for each line of log in history files.
|
|
|
* A record type appears as the first token in a single line of log.
|
|
@@ -609,6 +764,18 @@ public class JobHistory {
|
|
|
static Path getCompletedJobHistoryLocation() {
|
|
|
return DONE;
|
|
|
}
|
|
|
+
|
|
|
+ static int serialNumberDirectoryDigits() {
|
|
|
+ return SERIAL_NUMBER_DIRECTORY_DIGITS;
|
|
|
+ }
|
|
|
+
|
|
|
+ static int serialNumberTotalDigits() {
|
|
|
+ return serialNumberDirectoryDigits() + SERIAL_NUMBER_LOW_DIGITS;
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * Get the
|
|
|
+ */
|
|
|
|
|
|
/**
|
|
|
* Base class contais utility stuff to manage types key value pairs with enums.
|
|
@@ -680,6 +847,105 @@ public class JobHistory {
|
|
|
return values;
|
|
|
}
|
|
|
}
|
|
|
+
|
|
|
+ // hasMismatches is just used to return a second value if you want
|
|
|
+ // one. I would have used MutableBoxedBoolean if such had been provided.
|
|
|
+ static Path[] filteredStat2Paths
|
|
|
+ (FileStatus[] stats, boolean dirs, AtomicBoolean hasMismatches) {
|
|
|
+ int resultCount = 0;
|
|
|
+
|
|
|
+ if (hasMismatches == null) {
|
|
|
+ hasMismatches = new AtomicBoolean(false);
|
|
|
+ }
|
|
|
+
|
|
|
+ for (int i = 0; i < stats.length; ++i) {
|
|
|
+ if (stats[i].isDir() == dirs) {
|
|
|
+ stats[resultCount++] = stats[i];
|
|
|
+ } else {
|
|
|
+ hasMismatches.set(true);
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ Path[] paddedResult = FileUtil.stat2Paths(stats);
|
|
|
+
|
|
|
+ Path[] result = new Path[resultCount];
|
|
|
+
|
|
|
+ System.arraycopy(paddedResult, 0, result, 0, resultCount);
|
|
|
+
|
|
|
+ return result;
|
|
|
+ }
|
|
|
+
|
|
|
+ static FileStatus[] localGlobber
|
|
|
+ (FileSystem fs, Path root, String tail)
|
|
|
+ throws IOException {
|
|
|
+ return localGlobber(fs, root, tail, null);
|
|
|
+ }
|
|
|
+
|
|
|
+ static FileStatus[] localGlobber
|
|
|
+ (FileSystem fs, Path root, String tail, PathFilter filter)
|
|
|
+ throws IOException {
|
|
|
+ return localGlobber(fs, root, tail, filter, null);
|
|
|
+ }
|
|
|
+
|
|
|
+
|
|
|
+ // hasMismatches is just used to return a second value if you want
|
|
|
+ // one. I would have used MutableBoxedBoolean if such had been provided.
|
|
|
+ static FileStatus[] localGlobber
|
|
|
+ (FileSystem fs, Path root, String tail, PathFilter filter, AtomicBoolean hasFlatFiles)
|
|
|
+ throws IOException {
|
|
|
+ if (tail.equals("")) {
|
|
|
+ return filter == null ? fs.listStatus(root) : fs.listStatus(root, filter);
|
|
|
+ }
|
|
|
+
|
|
|
+ if (tail.startsWith("/*")) {
|
|
|
+ Path[] subdirs = filteredStat2Paths(fs.listStatus(root), true, hasFlatFiles);
|
|
|
+
|
|
|
+ FileStatus[][] subsubdirs = new FileStatus[subdirs.length][];
|
|
|
+
|
|
|
+ int subsubdirCount = 0;
|
|
|
+
|
|
|
+ if (subsubdirs.length == 0) {
|
|
|
+ return new FileStatus[0];
|
|
|
+ }
|
|
|
+
|
|
|
+ String newTail = tail.substring(2);
|
|
|
+
|
|
|
+ for (int i = 0; i < subdirs.length; ++i) {
|
|
|
+ subsubdirs[i] = localGlobber(fs, subdirs[i], newTail, filter, null);
|
|
|
+ subsubdirCount += subsubdirs[i].length;
|
|
|
+ }
|
|
|
+
|
|
|
+ FileStatus[] result = new FileStatus[subsubdirCount];
|
|
|
+
|
|
|
+ int segmentStart = 0;
|
|
|
+
|
|
|
+ for (int i = 0; i < subsubdirs.length; ++i) {
|
|
|
+ System.arraycopy(subsubdirs[i], 0, result, segmentStart, subsubdirs[i].length);
|
|
|
+ segmentStart += subsubdirs[i].length;
|
|
|
+ }
|
|
|
+
|
|
|
+ return result;
|
|
|
+ }
|
|
|
+
|
|
|
+ if (tail.startsWith("/")) {
|
|
|
+ int split = tail.indexOf('/', 1);
|
|
|
+
|
|
|
+ if (split < 0) {
|
|
|
+ return (filter == null
|
|
|
+ ? fs.listStatus(new Path(root, tail.substring(1)))
|
|
|
+ : fs.listStatus(new Path(root, tail.substring(1)), filter));
|
|
|
+ } else {
|
|
|
+ String thisSegment = tail.substring(1, split);
|
|
|
+ String newTail = tail.substring(split);
|
|
|
+ return localGlobber
|
|
|
+ (fs, new Path(root, thisSegment), newTail, filter, hasFlatFiles);
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ IOException e = new IOException("localGlobber: bad tail");
|
|
|
+
|
|
|
+ throw e;
|
|
|
+ }
|
|
|
|
|
|
/**
|
|
|
* Helper class for logging or reading back events related to job start, finish or failure.
|
|
@@ -739,7 +1005,7 @@ public class JobHistory {
|
|
|
*/
|
|
|
public static String getLocalJobFilePath(JobID jobId){
|
|
|
return System.getProperty("hadoop.log.dir") + File.separator +
|
|
|
- jobId + "_conf.xml";
|
|
|
+ jobId + CONF_FILE_NAME_SUFFIX;
|
|
|
}
|
|
|
|
|
|
/**
|
|
@@ -867,12 +1133,12 @@ public class JobHistory {
|
|
|
/**
|
|
|
* Generates the job history filename for a new job
|
|
|
*/
|
|
|
- private static String getNewJobHistoryFileName(JobConf jobConf, JobID id) {
|
|
|
- return JOBTRACKER_UNIQUE_STRING
|
|
|
- + id.toString() + "_" +
|
|
|
- getUserName(jobConf)
|
|
|
- + "_"
|
|
|
- + trimJobName(getJobName(jobConf));
|
|
|
+ private static String getNewJobHistoryFileName(JobConf jobConf, JobID id, long submitTime) {
|
|
|
+ return
|
|
|
+ id.toString() + "_"
|
|
|
+ + submitTime + "_"
|
|
|
+ + getUserName(jobConf) + "_"
|
|
|
+ + trimJobName(getJobName(jobConf));
|
|
|
}
|
|
|
|
|
|
/**
|
|
@@ -892,7 +1158,7 @@ public class JobHistory {
|
|
|
/**
|
|
|
* Recover the job history filename from the history folder.
|
|
|
* Uses the following pattern
|
|
|
- * $jt-hostname_[0-9]*_$job-id_$user-$job-name*
|
|
|
+ * $jt-hostname_[0-9]*_$job-id_$user_$job-name*
|
|
|
* @param jobConf the job conf
|
|
|
* @param id job id
|
|
|
*/
|
|
@@ -902,6 +1168,7 @@ public class JobHistory {
|
|
|
return getJobHistoryFileName(jobConf, id, new Path(LOG_DIR), LOGDIR_FS);
|
|
|
}
|
|
|
|
|
|
+ // Returns that portion of the pathname that sits under the DONE directory
|
|
|
static synchronized String getDoneJobHistoryFileName(JobConf jobConf,
|
|
|
JobID id) throws IOException {
|
|
|
if (DONE == null) {
|
|
@@ -909,7 +1176,7 @@ public class JobHistory {
|
|
|
}
|
|
|
return getJobHistoryFileName(jobConf, id, DONE, DONEDIR_FS);
|
|
|
}
|
|
|
-
|
|
|
+
|
|
|
/**
|
|
|
* @param dir The directory where to search.
|
|
|
*/
|
|
@@ -924,8 +1191,7 @@ public class JobHistory {
|
|
|
|
|
|
// Make the pattern matching the job's history file
|
|
|
final Pattern historyFilePattern =
|
|
|
- Pattern.compile(jobtrackerHostname + "_" + DIGITS + "_"
|
|
|
- + id.toString() + "_" + user + "_"
|
|
|
+ Pattern.compile(id.toString() + "_" + DIGITS + "_" + user + "_"
|
|
|
+ escapeRegexChars(jobName) + "+");
|
|
|
// a path filter that matches 4 parts of the filenames namely
|
|
|
// - jt-hostname
|
|
@@ -945,15 +1211,46 @@ public class JobHistory {
|
|
|
return historyFilePattern.matcher(fileName).find();
|
|
|
}
|
|
|
};
|
|
|
+
|
|
|
+ FileStatus[] statuses = null;
|
|
|
+
|
|
|
+ if (dir == DONE) {
|
|
|
+ final String snDirectoryComponent
|
|
|
+ = serialNumberDirectoryComponent(id);
|
|
|
+
|
|
|
+ final String scanTail
|
|
|
+ = (DONE_BEFORE_SERIAL_TAIL
|
|
|
+ + "/" + serialNumberDirectoryComponent(id));
|
|
|
+
|
|
|
+ if (DEBUG_MODE) {
|
|
|
+ System.err.println("JobHistory.getJobHistoryFileName DONE dir: scanning " + scanTail);
|
|
|
+
|
|
|
+ (new IOException("debug exception")).printStackTrace(System.err);
|
|
|
+ }
|
|
|
+
|
|
|
+ statuses = localGlobber(fs, DONE, scanTail, filter);
|
|
|
+ } else {
|
|
|
+ statuses = fs.listStatus(dir, filter);
|
|
|
+ }
|
|
|
|
|
|
- FileStatus[] statuses = fs.listStatus(dir, filter);
|
|
|
String filename = null;
|
|
|
- if (statuses.length == 0) {
|
|
|
+ if (statuses == null || statuses.length == 0) {
|
|
|
+ if (DEBUG_MODE) {
|
|
|
+ System.err.println("Nothing to recover for job " + id);
|
|
|
+ }
|
|
|
LOG.info("Nothing to recover for job " + id);
|
|
|
} else {
|
|
|
// return filename considering that fact the name can be a
|
|
|
// secondary filename like filename.recover
|
|
|
filename = getPrimaryFilename(statuses[0].getPath().getName(), jobName);
|
|
|
+ if (dir == DONE) {
|
|
|
+ Path parent = statuses[0].getPath().getParent();
|
|
|
+ String parentPathName = parent.toString();
|
|
|
+ String donePathName = DONE.toString();
|
|
|
+ filename = (parentPathName.substring(donePathName.length() + Path.SEPARATOR.length())
|
|
|
+ + Path.SEPARATOR + filename);
|
|
|
+ }
|
|
|
+
|
|
|
LOG.info("Recovered job history filename for job " + id + " is "
|
|
|
+ filename);
|
|
|
}
|
|
@@ -1146,9 +1443,9 @@ public class JobHistory {
|
|
|
* jobhistory file is complete.
|
|
|
* This *should* be the last call to jobhistory for a given job.
|
|
|
*/
|
|
|
- static void markCompleted(JobID id) throws IOException {
|
|
|
- fileManager.moveToDone(id);
|
|
|
- }
|
|
|
+ static void markCompleted(JobID id) throws IOException {
|
|
|
+ fileManager.moveToDone(id);
|
|
|
+ }
|
|
|
|
|
|
/**
|
|
|
* Log job submitted event to history. Creates a new file in history
|
|
@@ -1162,8 +1459,8 @@ public class JobHistory {
|
|
|
* @deprecated Use
|
|
|
* {@link #logSubmitted(JobID, JobConf, String, long, boolean)} instead.
|
|
|
*/
|
|
|
- @Deprecated
|
|
|
- public static void logSubmitted(JobID jobId, JobConf jobConf,
|
|
|
+ @Deprecated
|
|
|
+ public static void logSubmitted(JobID jobId, JobConf jobConf,
|
|
|
String jobConfPath, long submitTime)
|
|
|
throws IOException {
|
|
|
logSubmitted(jobId, jobConf, jobConfPath, submitTime, true);
|
|
@@ -1188,7 +1485,8 @@ public class JobHistory {
|
|
|
logFileName = getJobHistoryFileName(jobConf, jobId);
|
|
|
if (logFileName == null) {
|
|
|
logFileName =
|
|
|
- encodeJobHistoryFileName(getNewJobHistoryFileName(jobConf, jobId));
|
|
|
+ encodeJobHistoryFileName(getNewJobHistoryFileName
|
|
|
+ (jobConf, jobId, submitTime));
|
|
|
} else {
|
|
|
String parts[] = logFileName.split("_");
|
|
|
//TODO this is a hack :(
|
|
@@ -1198,7 +1496,8 @@ public class JobHistory {
|
|
|
}
|
|
|
} else {
|
|
|
logFileName =
|
|
|
- encodeJobHistoryFileName(getNewJobHistoryFileName(jobConf, jobId));
|
|
|
+ encodeJobHistoryFileName(getNewJobHistoryFileName
|
|
|
+ (jobConf, jobId, submitTime));
|
|
|
}
|
|
|
|
|
|
// setup the history log file for this job
|
|
@@ -1302,13 +1601,13 @@ public class JobHistory {
|
|
|
Path jobFilePath = null;
|
|
|
if (LOG_DIR != null) {
|
|
|
jobFilePath = new Path(LOG_DIR + File.separator +
|
|
|
- jobUniqueString + "_conf.xml");
|
|
|
+ jobUniqueString + CONF_FILE_NAME_SUFFIX);
|
|
|
fileManager.setConfFile(jobId, jobFilePath);
|
|
|
}
|
|
|
Path userJobFilePath = null;
|
|
|
if (userLogDir != null) {
|
|
|
userJobFilePath = new Path(userLogDir + File.separator +
|
|
|
- jobUniqueString + "_conf.xml");
|
|
|
+ jobUniqueString + CONF_FILE_NAME_SUFFIX);
|
|
|
}
|
|
|
FSDataOutputStream jobFileOut = null;
|
|
|
try {
|
|
@@ -2047,6 +2346,23 @@ public class JobHistory {
|
|
|
*/
|
|
|
public void handle(RecordTypes recType, Map<Keys, String> values) throws IOException;
|
|
|
}
|
|
|
+
|
|
|
+ static long directoryTime(String year, String seg2, String seg3) {
|
|
|
+ // set to current time. In debug mode, this is where the month
|
|
|
+ // and day get set.
|
|
|
+ Calendar result = Calendar.getInstance();
|
|
|
+ // canonicalize by filling in unset fields
|
|
|
+ result.setTimeInMillis(System.currentTimeMillis());
|
|
|
+
|
|
|
+ result.set(Calendar.YEAR, Integer.parseInt(year));
|
|
|
+
|
|
|
+ result.set(DEBUG_MODE ? Calendar.HOUR : Calendar.MONTH,
|
|
|
+ Integer.parseInt(seg2));
|
|
|
+ result.set(DEBUG_MODE ? Calendar.MINUTE : Calendar.DAY_OF_MONTH,
|
|
|
+ Integer.parseInt(seg3));
|
|
|
+
|
|
|
+ return result.getTimeInMillis();
|
|
|
+ }
|
|
|
|
|
|
/**
|
|
|
* Delete history files older than one month. Update master index and remove all
|
|
@@ -2054,35 +2370,103 @@ public class JobHistory {
|
|
|
* remove reference to the job tracker.
|
|
|
*
|
|
|
*/
|
|
|
- public static class HistoryCleaner implements Runnable{
|
|
|
+ public static class HistoryCleaner implements Runnable {
|
|
|
static final long ONE_DAY_IN_MS = 24 * 60 * 60 * 1000L;
|
|
|
- static final long THIRTY_DAYS_IN_MS = 30 * ONE_DAY_IN_MS;
|
|
|
+ static final long DIRECTORY_LIFE_IN_MS
|
|
|
+ = DEBUG_MODE ? 20 * 60 * 1000L : 30 * ONE_DAY_IN_MS;
|
|
|
+ static final long RUN_INTERVAL
|
|
|
+ = DEBUG_MODE ? 10L * 60L * 1000L : ONE_DAY_IN_MS;
|
|
|
private long now;
|
|
|
- private static boolean isRunning = false;
|
|
|
+ private static final AtomicBoolean isRunning = new AtomicBoolean(false);
|
|
|
private static long lastRan = 0;
|
|
|
|
|
|
+ private static Pattern parseDirectory
|
|
|
+ = Pattern.compile(".+/([0-9]+)/([0-9]+)/([0-9]+)/[0-9]+/?");
|
|
|
+
|
|
|
/**
|
|
|
* Cleans up history data.
|
|
|
*/
|
|
|
- public void run(){
|
|
|
- if (isRunning){
|
|
|
+ public void run() {
|
|
|
+ if (isRunning.getAndSet(true)) {
|
|
|
return;
|
|
|
}
|
|
|
now = System.currentTimeMillis();
|
|
|
// clean history only once a day at max
|
|
|
- if (lastRan != 0 && (now - lastRan) < ONE_DAY_IN_MS) {
|
|
|
+ if (lastRan != 0 && (now - lastRan) < RUN_INTERVAL) {
|
|
|
+ isRunning.set(false);
|
|
|
return;
|
|
|
}
|
|
|
- lastRan = now;
|
|
|
- isRunning = true;
|
|
|
+ lastRan = now;
|
|
|
+
|
|
|
+ Set<String> deletedPathnames = new HashSet<String>();
|
|
|
+
|
|
|
+ // XXXXX debug code
|
|
|
+ boolean printedOneDeletee = false;
|
|
|
+ boolean printedOneMovedFile = false;
|
|
|
+
|
|
|
try {
|
|
|
- FileStatus[] historyFiles = DONEDIR_FS.listStatus(DONE);
|
|
|
- // delete if older than 30 days
|
|
|
- if (historyFiles != null) {
|
|
|
- for (FileStatus f : historyFiles) {
|
|
|
- if (now - f.getModificationTime() > THIRTY_DAYS_IN_MS) {
|
|
|
- DONEDIR_FS.delete(f.getPath(), true);
|
|
|
- LOG.info("Deleting old history file : " + f.getPath());
|
|
|
+ Path[] datedDirectories
|
|
|
+ = FileUtil.stat2Paths(localGlobber(DONEDIR_FS, DONE,
|
|
|
+ DONE_BEFORE_SERIAL_TAIL, null));
|
|
|
+ // find directories older than 30 days
|
|
|
+ for (int i = 0; i < datedDirectories.length; ++i) {
|
|
|
+ String thisDir = datedDirectories[i].toString();
|
|
|
+ Matcher pathMatcher = parseDirectory.matcher(thisDir);
|
|
|
+
|
|
|
+ if (pathMatcher.matches()) {
|
|
|
+ long dirTime = directoryTime(pathMatcher.group(1),
|
|
|
+ pathMatcher.group(2),
|
|
|
+ pathMatcher.group(3));
|
|
|
+
|
|
|
+ if (DEBUG_MODE) {
|
|
|
+ System.err.println("HistoryCleaner.run just parsed " + thisDir
|
|
|
+ + " as year/month/day = " + pathMatcher.group(1)
|
|
|
+ + "/" + pathMatcher.group(2) + "/"
|
|
|
+ + pathMatcher.group(3));
|
|
|
+ }
|
|
|
+
|
|
|
+ if (dirTime < now - DIRECTORY_LIFE_IN_MS) {
|
|
|
+
|
|
|
+ if (DEBUG_MODE) {
|
|
|
+ Calendar then = Calendar.getInstance();
|
|
|
+ then.setTimeInMillis(dirTime);
|
|
|
+ Calendar nnow = Calendar.getInstance();
|
|
|
+ nnow.setTimeInMillis(now);
|
|
|
+
|
|
|
+ System.err.println("HistoryCleaner.run directory: " + thisDir
|
|
|
+ + " because its time is " + then
|
|
|
+ + " but it's now " + nnow);
|
|
|
+ System.err.println("then = " + dirTime);
|
|
|
+ System.err.println("now = " + now);
|
|
|
+ }
|
|
|
+
|
|
|
+ // remove every file in the directory and save the name
|
|
|
+ // so we can remove it from jobHistoryFileMap
|
|
|
+ Path[] deletees
|
|
|
+ = FileUtil.stat2Paths(localGlobber(DONEDIR_FS,
|
|
|
+ datedDirectories[i],
|
|
|
+ "/*/*", // sn + individual files
|
|
|
+ null));
|
|
|
+
|
|
|
+ for (int j = 0; j < deletees.length; ++j) {
|
|
|
+
|
|
|
+ if (DEBUG_MODE && !printedOneDeletee) {
|
|
|
+ System.err.println("HistoryCleaner.run deletee: " + deletees[j].toString());
|
|
|
+ printedOneDeletee = true;
|
|
|
+ }
|
|
|
+
|
|
|
+ DONEDIR_FS.delete(deletees[j]);
|
|
|
+ deletedPathnames.add(deletees[j].toString());
|
|
|
+ }
|
|
|
+ synchronized (existingDoneSubdirs) {
|
|
|
+ if (!existingDoneSubdirs.contains(datedDirectories[i]))
|
|
|
+ {
|
|
|
+ LOG.warn("JobHistory: existingDoneSubdirs doesn't contain "
|
|
|
+ + datedDirectories[i] + ", but should.");
|
|
|
+ }
|
|
|
+ DONEDIR_FS.delete(datedDirectories[i], true);
|
|
|
+ existingDoneSubdirs.remove(datedDirectories[i]);
|
|
|
+ }
|
|
|
}
|
|
|
}
|
|
|
}
|
|
@@ -2093,20 +2477,23 @@ public class JobHistory {
|
|
|
jobHistoryFileMap.entrySet().iterator();
|
|
|
while (it.hasNext()) {
|
|
|
MovedFileInfo info = it.next().getValue();
|
|
|
- if (now - info.timestamp > THIRTY_DAYS_IN_MS) {
|
|
|
+
|
|
|
+ if (DEBUG_MODE && !printedOneMovedFile) {
|
|
|
+ System.err.println("HistoryCleaner.run a moved file: " + info.historyFile);
|
|
|
+ printedOneMovedFile = true;
|
|
|
+ }
|
|
|
+
|
|
|
+ if (deletedPathnames.contains(info.historyFile)) {
|
|
|
it.remove();
|
|
|
- } else {
|
|
|
- //since entries are in sorted timestamp order, no more entries
|
|
|
- //are required to be checked
|
|
|
- break;
|
|
|
}
|
|
|
}
|
|
|
}
|
|
|
} catch (IOException ie) {
|
|
|
LOG.info("Error cleaning up history directory" +
|
|
|
StringUtils.stringifyException(ie));
|
|
|
+ } finally {
|
|
|
+ isRunning.set(false);
|
|
|
}
|
|
|
- isRunning = false;
|
|
|
}
|
|
|
|
|
|
static long getLastRan() {
|