|
@@ -62,12 +62,21 @@ import org.apache.hadoop.util.StringUtils;
|
|
|
* For parsing the job history it supports a listener based interface where each line is parsed
|
|
|
* and passed to listener. The listener can create an object model of history or look for specific
|
|
|
* events and discard rest of the history.
|
|
|
+ *
|
|
|
+ * CHANGE LOG :
|
|
|
+ * Version 0 : The history has the following format :
|
|
|
+ * TAG KEY1="VALUE1" KEY2="VALUE2" and so on.
|
|
|
+ TAG can be Job, Task, MapAttempt or ReduceAttempt.
|
|
|
+ Note that a '"' is the line delimiter.
|
|
|
+ * Version 1 : Changes the line delimiter to '.'
|
|
|
+ Values are now escaped for unambiguous parsing.
|
|
|
+ Added the Meta tag to store version info.
|
|
|
*/
|
|
|
public class JobHistory {
|
|
|
|
|
|
+ static final long VERSION = 1L;
|
|
|
public static final Log LOG = LogFactory.getLog(JobHistory.class);
|
|
|
private static final String DELIMITER = " ";
|
|
|
- private static final String LINE_DELIMITER = ".";
|
|
|
private static final char LINE_DELIMITER_CHAR = '.';
|
|
|
private static final char[] charsToEscape = new char[] {'"', '=',
|
|
|
LINE_DELIMITER_CHAR};
|
|
@@ -91,7 +100,7 @@ public class JobHistory {
|
|
|
* A record type appears as the first token in a single line of log.
|
|
|
*/
|
|
|
public static enum RecordTypes {
|
|
|
- Jobtracker, Job, Task, MapAttempt, ReduceAttempt
|
|
|
+ Jobtracker, Job, Task, MapAttempt, ReduceAttempt, Meta
|
|
|
}
|
|
|
|
|
|
/**
|
|
@@ -105,7 +114,7 @@ public class JobHistory {
|
|
|
FINISHED_MAPS, FINISHED_REDUCES, JOB_STATUS, TASKID, HOSTNAME, TASK_TYPE,
|
|
|
ERROR, TASK_ATTEMPT_ID, TASK_STATUS, COPY_PHASE, SORT_PHASE, REDUCE_PHASE,
|
|
|
SHUFFLE_FINISHED, SORT_FINISHED, COUNTERS, SPLITS, JOB_PRIORITY, HTTP_PORT,
|
|
|
- TRACKER_NAME, STATE_STRING
|
|
|
+ TRACKER_NAME, STATE_STRING, VERSION
|
|
|
}
|
|
|
|
|
|
/**
|
|
@@ -158,6 +167,70 @@ public class JobHistory {
|
|
|
return !(disableHistory);
|
|
|
}
|
|
|
|
|
|
+ /**
|
|
|
+ * Manages job-history's meta information such as version etc.
|
|
|
+ * Helps in logging version information to the job-history and recover
|
|
|
+ * version information from the history.
|
|
|
+ */
|
|
|
+ static class MetaInfoManager implements Listener {
|
|
|
+ private long version = 0L;
|
|
|
+ private KeyValuePair pairs = new KeyValuePair();
|
|
|
+
|
|
|
+ // Extract the version of the history that was used to write the history
|
|
|
+ public MetaInfoManager(String line) throws IOException {
|
|
|
+ if (null != line) {
|
|
|
+ // Parse the line
|
|
|
+ parseLine(line, this, false);
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ // Get the line delimiter
|
|
|
+ char getLineDelim() {
|
|
|
+ if (version == 0) {
|
|
|
+ return '"';
|
|
|
+ } else {
|
|
|
+ return LINE_DELIMITER_CHAR;
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ // Checks if the values are escaped or not
|
|
|
+ boolean isValueEscaped() {
|
|
|
+ // Note that the values are not escaped in version 0
|
|
|
+ return version != 0;
|
|
|
+ }
|
|
|
+
|
|
|
+ public void handle(RecordTypes recType, Map<Keys, String> values)
|
|
|
+ throws IOException {
|
|
|
+ // Check if the record is of type META
|
|
|
+ if (RecordTypes.Meta == recType) {
|
|
|
+ pairs.handle(values);
|
|
|
+ version = pairs.getLong(Keys.VERSION); // defaults to 0
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * Logs history meta-info to the history file. This needs to be called once
|
|
|
+ * per history file.
|
|
|
+ * @param jobId job id, assigned by jobtracker.
|
|
|
+ */
|
|
|
+ static void logMetaInfo(ArrayList<PrintWriter> writers){
|
|
|
+ if (!disableHistory){
|
|
|
+ if (null != writers){
|
|
|
+ JobHistory.log(writers, RecordTypes.Meta,
|
|
|
+ new Keys[] {Keys.VERSION},
|
|
|
+ new String[] {String.valueOf(VERSION)});
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ /** Escapes the string especially for {@link JobHistory}
|
|
|
+ */
|
|
|
+ static String escapeString(String data) {
|
|
|
+ return StringUtils.escapeString(data, StringUtils.ESCAPE_CHAR,
|
|
|
+ charsToEscape);
|
|
|
+ }
|
|
|
+
|
|
|
/**
|
|
|
* Parses history file and invokes Listener.handle() for
|
|
|
* each line of history. It can be used for looking through history
|
|
@@ -174,17 +247,34 @@ public class JobHistory {
|
|
|
try {
|
|
|
String line = null;
|
|
|
StringBuffer buf = new StringBuffer();
|
|
|
- while ((line = reader.readLine())!= null){
|
|
|
+
|
|
|
+ // Read the meta-info line. Note that this might a jobinfo line for files
|
|
|
+ // written with older format
|
|
|
+ line = reader.readLine();
|
|
|
+
|
|
|
+ // Check if the file is empty
|
|
|
+ if (line == null) {
|
|
|
+ return;
|
|
|
+ }
|
|
|
+
|
|
|
+ // Get the information required for further processing
|
|
|
+ MetaInfoManager mgr = new MetaInfoManager(line);
|
|
|
+ boolean isEscaped = mgr.isValueEscaped();
|
|
|
+ String lineDelim = String.valueOf(mgr.getLineDelim());
|
|
|
+ String escapedLineDelim =
|
|
|
+ StringUtils.escapeString(lineDelim, StringUtils.ESCAPE_CHAR,
|
|
|
+ mgr.getLineDelim());
|
|
|
+
|
|
|
+ do {
|
|
|
buf.append(line);
|
|
|
- if (!line.trim().endsWith(LINE_DELIMITER) ||
|
|
|
- line.trim().endsWith(StringUtils.escapeString(LINE_DELIMITER,
|
|
|
- StringUtils.ESCAPE_CHAR, LINE_DELIMITER_CHAR))) {
|
|
|
+ if (!line.trim().endsWith(lineDelim)
|
|
|
+ || line.trim().endsWith(escapedLineDelim)) {
|
|
|
buf.append("\n");
|
|
|
continue;
|
|
|
}
|
|
|
- parseLine(buf.toString(), l);
|
|
|
+ parseLine(buf.toString(), l, isEscaped);
|
|
|
buf = new StringBuffer();
|
|
|
- }
|
|
|
+ } while ((line = reader.readLine())!= null);
|
|
|
} finally {
|
|
|
try { reader.close(); } catch (IOException ex) {}
|
|
|
}
|
|
@@ -196,7 +286,8 @@ public class JobHistory {
|
|
|
* @param l
|
|
|
* @throws IOException
|
|
|
*/
|
|
|
- private static void parseLine(String line, Listener l)throws IOException{
|
|
|
+ private static void parseLine(String line, Listener l, boolean isEscaped)
|
|
|
+ throws IOException{
|
|
|
// extract the record type
|
|
|
int idx = line.indexOf(' ');
|
|
|
String recType = line.substring(0, idx);
|
|
@@ -208,8 +299,10 @@ public class JobHistory {
|
|
|
String tuple = matcher.group(0);
|
|
|
String []parts = StringUtils.split(tuple, StringUtils.ESCAPE_CHAR, '=');
|
|
|
String value = parts[1].substring(1, parts[1].length() -1);
|
|
|
- value = StringUtils.unEscapeString(value, StringUtils.ESCAPE_CHAR,
|
|
|
- charsToEscape);
|
|
|
+ if (isEscaped) {
|
|
|
+ value = StringUtils.unEscapeString(value, StringUtils.ESCAPE_CHAR,
|
|
|
+ charsToEscape);
|
|
|
+ }
|
|
|
parseBuffer.put(Keys.valueOf(parts[0]), value);
|
|
|
}
|
|
|
|
|
@@ -228,10 +321,9 @@ public class JobHistory {
|
|
|
|
|
|
static void log(PrintWriter out, RecordTypes recordType, Keys key,
|
|
|
String value){
|
|
|
- value = StringUtils.escapeString(value, StringUtils.ESCAPE_CHAR,
|
|
|
- charsToEscape);
|
|
|
+ value = escapeString(value);
|
|
|
out.println(recordType.name() + DELIMITER + key + "=\"" + value + "\""
|
|
|
- + DELIMITER + LINE_DELIMITER);
|
|
|
+ + DELIMITER + LINE_DELIMITER_CHAR);
|
|
|
}
|
|
|
|
|
|
/**
|
|
@@ -249,13 +341,12 @@ public class JobHistory {
|
|
|
for(int i =0; i< keys.length; i++){
|
|
|
buf.append(keys[i]);
|
|
|
buf.append("=\"");
|
|
|
- values[i] = StringUtils.escapeString(values[i],
|
|
|
- StringUtils.ESCAPE_CHAR, charsToEscape);
|
|
|
+ values[i] = escapeString(values[i]);
|
|
|
buf.append(values[i]);
|
|
|
buf.append("\"");
|
|
|
buf.append(DELIMITER);
|
|
|
}
|
|
|
- buf.append(LINE_DELIMITER);
|
|
|
+ buf.append(LINE_DELIMITER_CHAR);
|
|
|
|
|
|
for (PrintWriter out : writers) {
|
|
|
out.println(buf.toString());
|
|
@@ -747,6 +838,9 @@ public class JobHistory {
|
|
|
}
|
|
|
|
|
|
openJobs.put(jobUniqueString, writers);
|
|
|
+
|
|
|
+ // Log the history meta info
|
|
|
+ JobHistory.MetaInfoManager.logMetaInfo(writers);
|
|
|
|
|
|
//add to writer as well
|
|
|
JobHistory.log(writers, RecordTypes.Job,
|