|
@@ -71,17 +71,23 @@ public class JobHistory {
|
|
|
* Record types are identifiers for each line of log in history files.
|
|
|
* A record type appears as the first token in a single line of log.
|
|
|
*/
|
|
|
- public static enum RecordTypes {Jobtracker, Job, Task, MapAttempt, ReduceAttempt};
|
|
|
+ public static enum RecordTypes {
|
|
|
+ Jobtracker, Job, Task, MapAttempt, ReduceAttempt
|
|
|
+ }
|
|
|
+
|
|
|
/**
|
|
|
* Job history files contain key="value" pairs, where keys belong to this enum.
|
|
|
* It acts as a global namespace for all keys.
|
|
|
*/
|
|
|
- public static enum Keys { JOBTRACKERID,
|
|
|
- START_TIME, FINISH_TIME, JOBID, JOBNAME, USER, JOBCONF, SUBMIT_TIME, LAUNCH_TIME,
|
|
|
- TOTAL_MAPS, TOTAL_REDUCES, FAILED_MAPS, FAILED_REDUCES, FINISHED_MAPS, FINISHED_REDUCES,
|
|
|
- JOB_STATUS, TASKID, HOSTNAME, TASK_TYPE, ERROR, TASK_ATTEMPT_ID, TASK_STATUS,
|
|
|
- COPY_PHASE, SORT_PHASE, REDUCE_PHASE, SHUFFLE_FINISHED, SORT_FINISHED
|
|
|
- };
|
|
|
+ public static enum Keys {
|
|
|
+ JOBTRACKERID,
|
|
|
+ START_TIME, FINISH_TIME, JOBID, JOBNAME, USER, JOBCONF, SUBMIT_TIME,
|
|
|
+ LAUNCH_TIME, TOTAL_MAPS, TOTAL_REDUCES, FAILED_MAPS, FAILED_REDUCES,
|
|
|
+ FINISHED_MAPS, FINISHED_REDUCES, JOB_STATUS, TASKID, HOSTNAME, TASK_TYPE,
|
|
|
+ ERROR, TASK_ATTEMPT_ID, TASK_STATUS, COPY_PHASE, SORT_PHASE, REDUCE_PHASE,
|
|
|
+ SHUFFLE_FINISHED, SORT_FINISHED, COUNTERS
|
|
|
+ }
|
|
|
+
|
|
|
/**
|
|
|
* This enum contains some of the values commonly used by history log events.
|
|
|
* since values in history can only be strings - Values.name() is used in
|
|
@@ -89,7 +95,8 @@ public class JobHistory {
|
|
|
*/
|
|
|
public static enum Values {
|
|
|
SUCCESS, FAILED, KILLED, MAP, REDUCE
|
|
|
- };
|
|
|
+ }
|
|
|
+
|
|
|
// temp buffer for parsed dataa
|
|
|
private static Map<Keys,String> parseBuffer = new HashMap<Keys, String>();
|
|
|
|
|
@@ -181,7 +188,8 @@ public class JobHistory {
|
|
|
* @param value value
|
|
|
*/
|
|
|
|
|
|
- static void log(PrintWriter out, RecordTypes recordType, Enum key, String value){
|
|
|
+ static void log(PrintWriter out, RecordTypes recordType, Keys key,
|
|
|
+ String value){
|
|
|
out.println(recordType.name() + DELIMITER + key + "=\"" + value + "\"");
|
|
|
out.flush();
|
|
|
}
|
|
@@ -194,7 +202,8 @@ public class JobHistory {
|
|
|
* @param values type of log event
|
|
|
*/
|
|
|
|
|
|
- static void log(PrintWriter out, RecordTypes recordType, Enum[] keys, String[] values){
|
|
|
+ static void log(PrintWriter out, RecordTypes recordType, Keys[] keys,
|
|
|
+ String[] values){
|
|
|
StringBuffer buf = new StringBuffer(recordType.name());
|
|
|
buf.append(DELIMITER);
|
|
|
for(int i =0; i< keys.length; i++){
|
|
@@ -340,7 +349,7 @@ public class JobHistory {
|
|
|
if (!disableHistory){
|
|
|
synchronized(MASTER_INDEX_LOG_FILE){
|
|
|
JobHistory.log(masterIndex, RecordTypes.Job,
|
|
|
- new Enum[]{Keys.JOBID, Keys.JOBNAME, Keys.USER, Keys.SUBMIT_TIME, Keys.JOBCONF },
|
|
|
+ new Keys[]{Keys.JOBID, Keys.JOBNAME, Keys.USER, Keys.SUBMIT_TIME, Keys.JOBCONF },
|
|
|
new String[]{jobId, jobName, user,
|
|
|
String.valueOf(submitTime), jobConfPath}
|
|
|
);
|
|
@@ -354,7 +363,7 @@ public class JobHistory {
|
|
|
openJobs.put(logFileName, writer);
|
|
|
// add to writer as well
|
|
|
JobHistory.log(writer, RecordTypes.Job,
|
|
|
- new Enum[]{Keys.JOBID, Keys.JOBNAME, Keys.USER, Keys.SUBMIT_TIME, Keys.JOBCONF },
|
|
|
+ new Keys[]{Keys.JOBID, Keys.JOBNAME, Keys.USER, Keys.SUBMIT_TIME, Keys.JOBCONF },
|
|
|
new String[]{jobId, jobName, user,
|
|
|
String.valueOf(submitTime) , jobConfPath}
|
|
|
);
|
|
@@ -389,7 +398,7 @@ public class JobHistory {
|
|
|
if (!disableHistory){
|
|
|
synchronized(MASTER_INDEX_LOG_FILE){
|
|
|
JobHistory.log(masterIndex, RecordTypes.Job,
|
|
|
- new Enum[] {Keys.JOBID, Keys.LAUNCH_TIME, Keys.TOTAL_MAPS, Keys.TOTAL_REDUCES },
|
|
|
+ new Keys[] {Keys.JOBID, Keys.LAUNCH_TIME, Keys.TOTAL_MAPS, Keys.TOTAL_REDUCES },
|
|
|
new String[] {jobId, String.valueOf(startTime),
|
|
|
String.valueOf(totalMaps), String.valueOf(totalReduces) });
|
|
|
}
|
|
@@ -399,7 +408,7 @@ public class JobHistory {
|
|
|
|
|
|
if (null != writer){
|
|
|
JobHistory.log(writer, RecordTypes.Job,
|
|
|
- new Enum[] {Keys.JOBID, Keys.LAUNCH_TIME, Keys.TOTAL_MAPS, Keys.TOTAL_REDUCES },
|
|
|
+ new Keys[] {Keys.JOBID, Keys.LAUNCH_TIME, Keys.TOTAL_MAPS, Keys.TOTAL_REDUCES },
|
|
|
new String[] {jobId, String.valueOf(startTime), String.valueOf(totalMaps), String.valueOf(totalReduces)});
|
|
|
}
|
|
|
}
|
|
@@ -412,15 +421,22 @@ public class JobHistory {
|
|
|
* @param finishedReduces no of reduces finished sucessfully.
|
|
|
* @param failedMaps no of failed map tasks.
|
|
|
* @param failedReduces no of failed reduce tasks.
|
|
|
+ * @param counters the counters from the job
|
|
|
*/
|
|
|
- public static void logFinished(String jobId, long finishTime, int finishedMaps, int finishedReduces,
|
|
|
- int failedMaps, int failedReduces){
|
|
|
+ public static void logFinished(String jobId, long finishTime,
|
|
|
+ int finishedMaps, int finishedReduces,
|
|
|
+ int failedMaps, int failedReduces,
|
|
|
+ Counters counters){
|
|
|
if (!disableHistory){
|
|
|
synchronized(MASTER_INDEX_LOG_FILE){
|
|
|
JobHistory.log(masterIndex, RecordTypes.Job,
|
|
|
- new Enum[] {Keys.JOBID, Keys.FINISH_TIME, Keys.JOB_STATUS, Keys.FINISHED_MAPS, Keys.FINISHED_REDUCES },
|
|
|
- new String[] {jobId, "" + finishTime, Values.SUCCESS.name(),
|
|
|
- String.valueOf(finishedMaps), String.valueOf(finishedReduces) });
|
|
|
+ new Keys[] {Keys.JOBID, Keys.FINISH_TIME,
|
|
|
+ Keys.JOB_STATUS, Keys.FINISHED_MAPS,
|
|
|
+ Keys.FINISHED_REDUCES},
|
|
|
+ new String[] {jobId, "" + finishTime,
|
|
|
+ Values.SUCCESS.name(),
|
|
|
+ String.valueOf(finishedMaps),
|
|
|
+ String.valueOf(finishedReduces)});
|
|
|
}
|
|
|
|
|
|
// close job file for this job
|
|
@@ -428,11 +444,18 @@ public class JobHistory {
|
|
|
PrintWriter writer = openJobs.get(logFileName);
|
|
|
if (null != writer){
|
|
|
JobHistory.log(writer, RecordTypes.Job,
|
|
|
- new Enum[] {Keys.JOBID, Keys.FINISH_TIME, Keys.JOB_STATUS, Keys.FINISHED_MAPS, Keys.FINISHED_REDUCES,
|
|
|
- Keys.FAILED_MAPS, Keys.FAILED_REDUCES},
|
|
|
- new String[] {jobId, "" + finishTime, Values.SUCCESS.name(),
|
|
|
- String.valueOf(finishedMaps), String.valueOf(finishedReduces),
|
|
|
- String.valueOf(failedMaps), String.valueOf(failedReduces)});
|
|
|
+ new Keys[] {Keys.JOBID, Keys.FINISH_TIME,
|
|
|
+ Keys.JOB_STATUS, Keys.FINISHED_MAPS,
|
|
|
+ Keys.FINISHED_REDUCES,
|
|
|
+ Keys.FAILED_MAPS, Keys.FAILED_REDUCES,
|
|
|
+ Keys.COUNTERS},
|
|
|
+ new String[] {jobId, Long.toString(finishTime),
|
|
|
+ Values.SUCCESS.name(),
|
|
|
+ String.valueOf(finishedMaps),
|
|
|
+ String.valueOf(finishedReduces),
|
|
|
+ String.valueOf(failedMaps),
|
|
|
+ String.valueOf(failedReduces),
|
|
|
+ stringifyCounters(counters)});
|
|
|
writer.close();
|
|
|
openJobs.remove(logFileName);
|
|
|
}
|
|
@@ -451,7 +474,7 @@ public class JobHistory {
|
|
|
if (!disableHistory){
|
|
|
synchronized(MASTER_INDEX_LOG_FILE){
|
|
|
JobHistory.log(masterIndex, RecordTypes.Job,
|
|
|
- new Enum[] {Keys.JOBID, Keys.FINISH_TIME, Keys.JOB_STATUS, Keys.FINISHED_MAPS, Keys.FINISHED_REDUCES },
|
|
|
+ new Keys[] {Keys.JOBID, Keys.FINISH_TIME, Keys.JOB_STATUS, Keys.FINISHED_MAPS, Keys.FINISHED_REDUCES },
|
|
|
new String[] {jobid, String.valueOf(timestamp), Values.FAILED.name(), String.valueOf(finishedMaps),
|
|
|
String.valueOf(finishedReduces)});
|
|
|
}
|
|
@@ -459,7 +482,7 @@ public class JobHistory {
|
|
|
PrintWriter writer = openJobs.get(logFileName);
|
|
|
if (null != writer){
|
|
|
JobHistory.log(writer, RecordTypes.Job,
|
|
|
- new Enum[] {Keys.JOBID, Keys.FINISH_TIME, Keys.JOB_STATUS, Keys.FINISHED_MAPS, Keys.FINISHED_REDUCES },
|
|
|
+ new Keys[] {Keys.JOBID, Keys.FINISH_TIME, Keys.JOB_STATUS, Keys.FINISHED_MAPS, Keys.FINISHED_REDUCES },
|
|
|
new String[] {jobid, String.valueOf(timestamp), Values.FAILED.name(), String.valueOf(finishedMaps),
|
|
|
String.valueOf(finishedReduces)});
|
|
|
writer.close();
|
|
@@ -488,7 +511,8 @@ public class JobHistory {
|
|
|
if (!disableHistory){
|
|
|
PrintWriter writer = openJobs.get(JOBTRACKER_START_TIME + "_" + jobId);
|
|
|
if (null != writer){
|
|
|
- JobHistory.log(writer, RecordTypes.Task, new Enum[]{Keys.TASKID, Keys.TASK_TYPE , Keys.START_TIME},
|
|
|
+ JobHistory.log(writer, RecordTypes.Task,
|
|
|
+ new Keys[]{Keys.TASKID, Keys.TASK_TYPE , Keys.START_TIME},
|
|
|
new String[]{taskId, taskType, String.valueOf(startTime)});
|
|
|
}
|
|
|
}
|
|
@@ -501,13 +525,17 @@ public class JobHistory {
|
|
|
* @param finishTime finish timeof task in ms
|
|
|
*/
|
|
|
public static void logFinished(String jobId, String taskId, String taskType,
|
|
|
- long finishTime){
|
|
|
+ long finishTime, Counters counters){
|
|
|
if (!disableHistory){
|
|
|
PrintWriter writer = openJobs.get(JOBTRACKER_START_TIME + "_" + jobId);
|
|
|
if (null != writer){
|
|
|
- JobHistory.log(writer, RecordTypes.Task, new Enum[]{Keys.TASKID, Keys.TASK_TYPE,
|
|
|
- Keys.TASK_STATUS, Keys.FINISH_TIME},
|
|
|
- new String[]{ taskId, taskType, Values.SUCCESS.name(), String.valueOf(finishTime)});
|
|
|
+ JobHistory.log(writer, RecordTypes.Task,
|
|
|
+ new Keys[]{Keys.TASKID, Keys.TASK_TYPE,
|
|
|
+ Keys.TASK_STATUS, Keys.FINISH_TIME,
|
|
|
+ Keys.COUNTERS},
|
|
|
+ new String[]{ taskId, taskType, Values.SUCCESS.name(),
|
|
|
+ String.valueOf(finishTime),
|
|
|
+ stringifyCounters(counters)});
|
|
|
}
|
|
|
}
|
|
|
}
|
|
@@ -523,8 +551,9 @@ public class JobHistory {
|
|
|
if (!disableHistory){
|
|
|
PrintWriter writer = openJobs.get(JOBTRACKER_START_TIME + "_" + jobId);
|
|
|
if (null != writer){
|
|
|
- JobHistory.log(writer, RecordTypes.Task, new Enum[]{Keys.TASKID, Keys.TASK_TYPE,
|
|
|
- Keys.TASK_STATUS, Keys.FINISH_TIME, Keys.ERROR},
|
|
|
+ JobHistory.log(writer, RecordTypes.Task,
|
|
|
+ new Keys[]{Keys.TASKID, Keys.TASK_TYPE,
|
|
|
+ Keys.TASK_STATUS, Keys.FINISH_TIME, Keys.ERROR},
|
|
|
new String[]{ taskId, taskType, Values.FAILED.name(), String.valueOf(time) , error});
|
|
|
}
|
|
|
}
|
|
@@ -560,8 +589,9 @@ public class JobHistory {
|
|
|
PrintWriter writer = openJobs.get(JOBTRACKER_START_TIME + "_" + jobId);
|
|
|
if (null != writer){
|
|
|
JobHistory.log(writer, RecordTypes.MapAttempt,
|
|
|
- new Enum[]{ Keys.TASK_TYPE, Keys.TASKID,
|
|
|
- Keys.TASK_ATTEMPT_ID, Keys.START_TIME, Keys.HOSTNAME},
|
|
|
+ new Keys[]{ Keys.TASK_TYPE, Keys.TASKID,
|
|
|
+ Keys.TASK_ATTEMPT_ID, Keys.START_TIME,
|
|
|
+ Keys.HOSTNAME},
|
|
|
new String[]{Values.MAP.name(), taskId,
|
|
|
taskAttemptId, String.valueOf(startTime), hostName});
|
|
|
}
|
|
@@ -575,18 +605,22 @@ public class JobHistory {
|
|
|
* @param finishTime finish time
|
|
|
* @param hostName host name
|
|
|
*/
|
|
|
- public static void logFinished(String jobId, String taskId, String taskAttemptId, long finishTime, String hostName){
|
|
|
+ public static void logFinished(String jobId, String taskId,
|
|
|
+ String taskAttemptId, long finishTime,
|
|
|
+ String hostName){
|
|
|
if (!disableHistory){
|
|
|
PrintWriter writer = openJobs.get(JOBTRACKER_START_TIME + "_" + jobId);
|
|
|
if (null != writer){
|
|
|
JobHistory.log(writer, RecordTypes.MapAttempt,
|
|
|
- new Enum[]{ Keys.TASK_TYPE, Keys.TASKID, Keys.TASK_ATTEMPT_ID, Keys.TASK_STATUS,
|
|
|
+ new Keys[]{ Keys.TASK_TYPE, Keys.TASKID,
|
|
|
+ Keys.TASK_ATTEMPT_ID, Keys.TASK_STATUS,
|
|
|
Keys.FINISH_TIME, Keys.HOSTNAME},
|
|
|
new String[]{Values.MAP.name(), taskId, taskAttemptId, Values.SUCCESS.name(),
|
|
|
String.valueOf(finishTime), hostName});
|
|
|
}
|
|
|
}
|
|
|
}
|
|
|
+
|
|
|
/**
|
|
|
* Log task attempt failed event.
|
|
|
* @param jobId jobid
|
|
@@ -602,7 +636,7 @@ public class JobHistory {
|
|
|
PrintWriter writer = openJobs.get(JOBTRACKER_START_TIME + "_" + jobId);
|
|
|
if (null != writer){
|
|
|
JobHistory.log(writer, RecordTypes.MapAttempt,
|
|
|
- new Enum[]{Keys.TASK_TYPE, Keys.TASKID, Keys.TASK_ATTEMPT_ID, Keys.TASK_STATUS,
|
|
|
+ new Keys[]{Keys.TASK_TYPE, Keys.TASKID, Keys.TASK_ATTEMPT_ID, Keys.TASK_STATUS,
|
|
|
Keys.FINISH_TIME, Keys.HOSTNAME, Keys.ERROR},
|
|
|
new String[]{ Values.MAP.name(), taskId, taskAttemptId, Values.FAILED.name(),
|
|
|
String.valueOf(timestamp), hostName, error});
|
|
@@ -624,7 +658,7 @@ public class JobHistory {
|
|
|
PrintWriter writer = openJobs.get(JOBTRACKER_START_TIME + "_" + jobId);
|
|
|
if (null != writer){
|
|
|
JobHistory.log(writer, RecordTypes.MapAttempt,
|
|
|
- new Enum[]{Keys.TASK_TYPE, Keys.TASKID, Keys.TASK_ATTEMPT_ID, Keys.TASK_STATUS,
|
|
|
+ new Keys[]{Keys.TASK_TYPE, Keys.TASKID, Keys.TASK_ATTEMPT_ID, Keys.TASK_STATUS,
|
|
|
Keys.FINISH_TIME, Keys.HOSTNAME, Keys.ERROR},
|
|
|
new String[]{ Values.MAP.name(), taskId, taskAttemptId, Values.KILLED.name(),
|
|
|
String.valueOf(timestamp), hostName, error});
|
|
@@ -651,7 +685,7 @@ public class JobHistory {
|
|
|
PrintWriter writer = openJobs.get(JOBTRACKER_START_TIME + "_" + jobId);
|
|
|
if (null != writer){
|
|
|
JobHistory.log(writer, RecordTypes.ReduceAttempt,
|
|
|
- new Enum[]{ Keys.TASK_TYPE, Keys.TASKID,
|
|
|
+ new Keys[]{ Keys.TASK_TYPE, Keys.TASKID,
|
|
|
Keys.TASK_ATTEMPT_ID, Keys.START_TIME, Keys.HOSTNAME},
|
|
|
new String[]{Values.REDUCE.name(), taskId,
|
|
|
taskAttemptId, String.valueOf(startTime), hostName});
|
|
@@ -668,14 +702,18 @@ public class JobHistory {
|
|
|
* @param finishTime finish time of task
|
|
|
* @param hostName host name where task attempt executed
|
|
|
*/
|
|
|
- public static void logFinished(String jobId, String taskId, String taskAttemptId,
|
|
|
- long shuffleFinished, long sortFinished, long finishTime, String hostName){
|
|
|
+ public static void logFinished(String jobId, String taskId,
|
|
|
+ String taskAttemptId, long shuffleFinished,
|
|
|
+ long sortFinished, long finishTime,
|
|
|
+ String hostName){
|
|
|
if (!disableHistory){
|
|
|
PrintWriter writer = openJobs.get(JOBTRACKER_START_TIME + "_" + jobId);
|
|
|
if (null != writer){
|
|
|
JobHistory.log(writer, RecordTypes.ReduceAttempt,
|
|
|
- new Enum[]{ Keys.TASK_TYPE, Keys.TASKID, Keys.TASK_ATTEMPT_ID, Keys.TASK_STATUS,
|
|
|
- Keys.SHUFFLE_FINISHED, Keys.SORT_FINISHED, Keys.FINISH_TIME, Keys.HOSTNAME},
|
|
|
+ new Keys[]{ Keys.TASK_TYPE, Keys.TASKID,
|
|
|
+ Keys.TASK_ATTEMPT_ID, Keys.TASK_STATUS,
|
|
|
+ Keys.SHUFFLE_FINISHED, Keys.SORT_FINISHED,
|
|
|
+ Keys.FINISH_TIME, Keys.HOSTNAME},
|
|
|
new String[]{Values.REDUCE.name(), taskId, taskAttemptId, Values.SUCCESS.name(),
|
|
|
String.valueOf(shuffleFinished), String.valueOf(sortFinished),
|
|
|
String.valueOf(finishTime), hostName});
|
|
@@ -697,7 +735,7 @@ public class JobHistory {
|
|
|
PrintWriter writer = openJobs.get(JOBTRACKER_START_TIME + "_" + jobId);
|
|
|
if (null != writer){
|
|
|
JobHistory.log(writer, RecordTypes.ReduceAttempt,
|
|
|
- new Enum[]{ Keys.TASK_TYPE, Keys.TASKID, Keys.TASK_ATTEMPT_ID, Keys.TASK_STATUS,
|
|
|
+ new Keys[]{ Keys.TASK_TYPE, Keys.TASKID, Keys.TASK_ATTEMPT_ID, Keys.TASK_STATUS,
|
|
|
Keys.FINISH_TIME, Keys.HOSTNAME, Keys.ERROR },
|
|
|
new String[]{ Values.REDUCE.name(), taskId, taskAttemptId, Values.FAILED.name(),
|
|
|
String.valueOf(timestamp), hostName, error });
|
|
@@ -719,8 +757,10 @@ public class JobHistory {
|
|
|
PrintWriter writer = openJobs.get(JOBTRACKER_START_TIME + "_" + jobId);
|
|
|
if (null != writer){
|
|
|
JobHistory.log(writer, RecordTypes.ReduceAttempt,
|
|
|
- new Enum[]{ Keys.TASK_TYPE, Keys.TASKID, Keys.TASK_ATTEMPT_ID, Keys.TASK_STATUS,
|
|
|
- Keys.FINISH_TIME, Keys.HOSTNAME, Keys.ERROR },
|
|
|
+ new Keys[]{ Keys.TASK_TYPE, Keys.TASKID,
|
|
|
+ Keys.TASK_ATTEMPT_ID, Keys.TASK_STATUS,
|
|
|
+ Keys.FINISH_TIME, Keys.HOSTNAME,
|
|
|
+ Keys.ERROR },
|
|
|
new String[]{ Values.REDUCE.name(), taskId, taskAttemptId, Values.KILLED.name(),
|
|
|
String.valueOf(timestamp), hostName, error });
|
|
|
}
|
|
@@ -728,6 +768,33 @@ public class JobHistory {
|
|
|
}
|
|
|
|
|
|
}
|
|
|
+
|
|
|
+ /**
|
|
|
+ * Convert a counters object into a string
|
|
|
+ * @param counters the counters to stringify
|
|
|
+ * @return the resulting string
|
|
|
+ */
|
|
|
+ private static String stringifyCounters(Counters counters) {
|
|
|
+ StringBuffer buffer = new StringBuffer();
|
|
|
+ for(String groupName: counters.getGroupNames()){
|
|
|
+ Counters.Group group = counters.getGroup(groupName);
|
|
|
+ boolean first = true;
|
|
|
+ for(String counterName: group.getCounterNames()) {
|
|
|
+ if (first) {
|
|
|
+ first = false;
|
|
|
+ } else {
|
|
|
+ buffer.append(',');
|
|
|
+ }
|
|
|
+ buffer.append(groupName);
|
|
|
+ buffer.append('.');
|
|
|
+ buffer.append(counterName);
|
|
|
+ buffer.append('=');
|
|
|
+ buffer.append(group.getCounter(counterName));
|
|
|
+ }
|
|
|
+ }
|
|
|
+ return buffer.toString();
|
|
|
+ }
|
|
|
+
|
|
|
/**
|
|
|
* Callback interface for reading back log events from JobHistory. This interface
|
|
|
* should be implemented and passed to JobHistory.parseHistory()
|
|
@@ -782,17 +849,22 @@ public class JobHistory {
|
|
|
// find job that started more than one month back and remove them
|
|
|
// for jobtracker instances which dont have a job in past one month
|
|
|
// remove the jobtracker start timestamp as well.
|
|
|
- for (Map<String, JobHistory.JobInfo> jobs :
|
|
|
- jobTrackersToJobs.values()) {
|
|
|
- for(Iterator iter = jobs.keySet().iterator(); iter.hasNext(); iter.next()){
|
|
|
- JobHistory.JobInfo job = jobs.get(iter.next());
|
|
|
- if (now - job.getLong(Keys.SUBMIT_TIME) > THIRTY_DAYS_IN_MS) {
|
|
|
- iter.remove();
|
|
|
- }
|
|
|
- if (jobs.size() == 0){
|
|
|
- iter.remove();
|
|
|
+ Iterator<Map<String, JobHistory.JobInfo>> jobTrackerItr =
|
|
|
+ jobTrackersToJobs.values().iterator();
|
|
|
+ while (jobTrackerItr.hasNext()) {
|
|
|
+ Map<String, JobHistory.JobInfo> jobs = jobTrackerItr.next();
|
|
|
+ Iterator<Map.Entry<String, JobHistory.JobInfo>> jobItr =
|
|
|
+ jobs.entrySet().iterator();
|
|
|
+ while (jobItr.hasNext()) {
|
|
|
+ Map.Entry<String, JobHistory.JobInfo> item = jobItr.next();
|
|
|
+ if (now - item.getValue().getLong(Keys.SUBMIT_TIME) >
|
|
|
+ THIRTY_DAYS_IN_MS) {
|
|
|
+ jobItr.remove();
|
|
|
}
|
|
|
}
|
|
|
+ if (jobs.size() == 0){
|
|
|
+ jobTrackerItr.remove();
|
|
|
+ }
|
|
|
}
|
|
|
masterIndex.close();
|
|
|
masterIndex = new PrintWriter(logFile);
|