|
@@ -71,23 +71,17 @@ public class JobHistory {
|
|
|
* Record types are identifiers for each line of log in history files.
|
|
|
* A record type appears as the first token in a single line of log.
|
|
|
*/
|
|
|
- public static enum RecordTypes {
|
|
|
- Jobtracker, Job, Task, MapAttempt, ReduceAttempt
|
|
|
- }
|
|
|
-
|
|
|
+ public static enum RecordTypes {Jobtracker, Job, Task, MapAttempt, ReduceAttempt};
|
|
|
/**
|
|
|
* Job history files contain key="value" pairs, where keys belong to this enum.
|
|
|
* It acts as a global namespace for all keys.
|
|
|
*/
|
|
|
- public static enum Keys {
|
|
|
- JOBTRACKERID,
|
|
|
- START_TIME, FINISH_TIME, JOBID, JOBNAME, USER, JOBCONF, SUBMIT_TIME,
|
|
|
- LAUNCH_TIME, TOTAL_MAPS, TOTAL_REDUCES, FAILED_MAPS, FAILED_REDUCES,
|
|
|
- FINISHED_MAPS, FINISHED_REDUCES, JOB_STATUS, TASKID, HOSTNAME, TASK_TYPE,
|
|
|
- ERROR, TASK_ATTEMPT_ID, TASK_STATUS, COPY_PHASE, SORT_PHASE, REDUCE_PHASE,
|
|
|
- SHUFFLE_FINISHED, SORT_FINISHED, COUNTERS
|
|
|
- }
|
|
|
-
|
|
|
+ public static enum Keys { JOBTRACKERID,
|
|
|
+ START_TIME, FINISH_TIME, JOBID, JOBNAME, USER, JOBCONF, SUBMIT_TIME, LAUNCH_TIME,
|
|
|
+ TOTAL_MAPS, TOTAL_REDUCES, FAILED_MAPS, FAILED_REDUCES, FINISHED_MAPS, FINISHED_REDUCES,
|
|
|
+ JOB_STATUS, TASKID, HOSTNAME, TASK_TYPE, ERROR, TASK_ATTEMPT_ID, TASK_STATUS,
|
|
|
+ COPY_PHASE, SORT_PHASE, REDUCE_PHASE, SHUFFLE_FINISHED, SORT_FINISHED
|
|
|
+ };
|
|
|
/**
|
|
|
* This enum contains some of the values commonly used by history log events.
|
|
|
* since values in history can only be strings - Values.name() is used in
|
|
@@ -95,8 +89,7 @@ public class JobHistory {
|
|
|
*/
|
|
|
public static enum Values {
|
|
|
SUCCESS, FAILED, KILLED, MAP, REDUCE
|
|
|
- }
|
|
|
-
|
|
|
+ };
|
|
|
// temp buffer for parsed dataa
|
|
|
private static Map<Keys,String> parseBuffer = new HashMap<Keys, String>();
|
|
|
|
|
@@ -188,8 +181,7 @@ public class JobHistory {
|
|
|
* @param value value
|
|
|
*/
|
|
|
|
|
|
- static void log(PrintWriter out, RecordTypes recordType, Keys key,
|
|
|
- String value){
|
|
|
+ static void log(PrintWriter out, RecordTypes recordType, Enum key, String value){
|
|
|
out.println(recordType.name() + DELIMITER + key + "=\"" + value + "\"");
|
|
|
out.flush();
|
|
|
}
|
|
@@ -202,8 +194,7 @@ public class JobHistory {
|
|
|
* @param values type of log event
|
|
|
*/
|
|
|
|
|
|
- static void log(PrintWriter out, RecordTypes recordType, Keys[] keys,
|
|
|
- String[] values){
|
|
|
+ static void log(PrintWriter out, RecordTypes recordType, Enum[] keys, String[] values){
|
|
|
StringBuffer buf = new StringBuffer(recordType.name());
|
|
|
buf.append(DELIMITER);
|
|
|
for(int i =0; i< keys.length; i++){
|
|
@@ -349,7 +340,7 @@ public class JobHistory {
|
|
|
if (!disableHistory){
|
|
|
synchronized(MASTER_INDEX_LOG_FILE){
|
|
|
JobHistory.log(masterIndex, RecordTypes.Job,
|
|
|
- new Keys[]{Keys.JOBID, Keys.JOBNAME, Keys.USER, Keys.SUBMIT_TIME, Keys.JOBCONF },
|
|
|
+ new Enum[]{Keys.JOBID, Keys.JOBNAME, Keys.USER, Keys.SUBMIT_TIME, Keys.JOBCONF },
|
|
|
new String[]{jobId, jobName, user,
|
|
|
String.valueOf(submitTime), jobConfPath}
|
|
|
);
|
|
@@ -363,7 +354,7 @@ public class JobHistory {
|
|
|
openJobs.put(logFileName, writer);
|
|
|
// add to writer as well
|
|
|
JobHistory.log(writer, RecordTypes.Job,
|
|
|
- new Keys[]{Keys.JOBID, Keys.JOBNAME, Keys.USER, Keys.SUBMIT_TIME, Keys.JOBCONF },
|
|
|
+ new Enum[]{Keys.JOBID, Keys.JOBNAME, Keys.USER, Keys.SUBMIT_TIME, Keys.JOBCONF },
|
|
|
new String[]{jobId, jobName, user,
|
|
|
String.valueOf(submitTime) , jobConfPath}
|
|
|
);
|
|
@@ -398,7 +389,7 @@ public class JobHistory {
|
|
|
if (!disableHistory){
|
|
|
synchronized(MASTER_INDEX_LOG_FILE){
|
|
|
JobHistory.log(masterIndex, RecordTypes.Job,
|
|
|
- new Keys[] {Keys.JOBID, Keys.LAUNCH_TIME, Keys.TOTAL_MAPS, Keys.TOTAL_REDUCES },
|
|
|
+ new Enum[] {Keys.JOBID, Keys.LAUNCH_TIME, Keys.TOTAL_MAPS, Keys.TOTAL_REDUCES },
|
|
|
new String[] {jobId, String.valueOf(startTime),
|
|
|
String.valueOf(totalMaps), String.valueOf(totalReduces) });
|
|
|
}
|
|
@@ -408,7 +399,7 @@ public class JobHistory {
|
|
|
|
|
|
if (null != writer){
|
|
|
JobHistory.log(writer, RecordTypes.Job,
|
|
|
- new Keys[] {Keys.JOBID, Keys.LAUNCH_TIME, Keys.TOTAL_MAPS, Keys.TOTAL_REDUCES },
|
|
|
+ new Enum[] {Keys.JOBID, Keys.LAUNCH_TIME, Keys.TOTAL_MAPS, Keys.TOTAL_REDUCES },
|
|
|
new String[] {jobId, String.valueOf(startTime), String.valueOf(totalMaps), String.valueOf(totalReduces)});
|
|
|
}
|
|
|
}
|
|
@@ -421,22 +412,15 @@ public class JobHistory {
|
|
|
* @param finishedReduces no of reduces finished sucessfully.
|
|
|
* @param failedMaps no of failed map tasks.
|
|
|
* @param failedReduces no of failed reduce tasks.
|
|
|
- * @param counters the counters from the job
|
|
|
*/
|
|
|
- public static void logFinished(String jobId, long finishTime,
|
|
|
- int finishedMaps, int finishedReduces,
|
|
|
- int failedMaps, int failedReduces,
|
|
|
- Counters counters){
|
|
|
+ public static void logFinished(String jobId, long finishTime, int finishedMaps, int finishedReduces,
|
|
|
+ int failedMaps, int failedReduces){
|
|
|
if (!disableHistory){
|
|
|
synchronized(MASTER_INDEX_LOG_FILE){
|
|
|
JobHistory.log(masterIndex, RecordTypes.Job,
|
|
|
- new Keys[] {Keys.JOBID, Keys.FINISH_TIME,
|
|
|
- Keys.JOB_STATUS, Keys.FINISHED_MAPS,
|
|
|
- Keys.FINISHED_REDUCES},
|
|
|
- new String[] {jobId, "" + finishTime,
|
|
|
- Values.SUCCESS.name(),
|
|
|
- String.valueOf(finishedMaps),
|
|
|
- String.valueOf(finishedReduces)});
|
|
|
+ new Enum[] {Keys.JOBID, Keys.FINISH_TIME, Keys.JOB_STATUS, Keys.FINISHED_MAPS, Keys.FINISHED_REDUCES },
|
|
|
+ new String[] {jobId, "" + finishTime, Values.SUCCESS.name(),
|
|
|
+ String.valueOf(finishedMaps), String.valueOf(finishedReduces) });
|
|
|
}
|
|
|
|
|
|
// close job file for this job
|
|
@@ -444,18 +428,11 @@ public class JobHistory {
|
|
|
PrintWriter writer = openJobs.get(logFileName);
|
|
|
if (null != writer){
|
|
|
JobHistory.log(writer, RecordTypes.Job,
|
|
|
- new Keys[] {Keys.JOBID, Keys.FINISH_TIME,
|
|
|
- Keys.JOB_STATUS, Keys.FINISHED_MAPS,
|
|
|
- Keys.FINISHED_REDUCES,
|
|
|
- Keys.FAILED_MAPS, Keys.FAILED_REDUCES,
|
|
|
- Keys.COUNTERS},
|
|
|
- new String[] {jobId, Long.toString(finishTime),
|
|
|
- Values.SUCCESS.name(),
|
|
|
- String.valueOf(finishedMaps),
|
|
|
- String.valueOf(finishedReduces),
|
|
|
- String.valueOf(failedMaps),
|
|
|
- String.valueOf(failedReduces),
|
|
|
- stringifyCounters(counters)});
|
|
|
+ new Enum[] {Keys.JOBID, Keys.FINISH_TIME, Keys.JOB_STATUS, Keys.FINISHED_MAPS, Keys.FINISHED_REDUCES,
|
|
|
+ Keys.FAILED_MAPS, Keys.FAILED_REDUCES},
|
|
|
+ new String[] {jobId, "" + finishTime, Values.SUCCESS.name(),
|
|
|
+ String.valueOf(finishedMaps), String.valueOf(finishedReduces),
|
|
|
+ String.valueOf(failedMaps), String.valueOf(failedReduces)});
|
|
|
writer.close();
|
|
|
openJobs.remove(logFileName);
|
|
|
}
|
|
@@ -474,7 +451,7 @@ public class JobHistory {
|
|
|
if (!disableHistory){
|
|
|
synchronized(MASTER_INDEX_LOG_FILE){
|
|
|
JobHistory.log(masterIndex, RecordTypes.Job,
|
|
|
- new Keys[] {Keys.JOBID, Keys.FINISH_TIME, Keys.JOB_STATUS, Keys.FINISHED_MAPS, Keys.FINISHED_REDUCES },
|
|
|
+ new Enum[] {Keys.JOBID, Keys.FINISH_TIME, Keys.JOB_STATUS, Keys.FINISHED_MAPS, Keys.FINISHED_REDUCES },
|
|
|
new String[] {jobid, String.valueOf(timestamp), Values.FAILED.name(), String.valueOf(finishedMaps),
|
|
|
String.valueOf(finishedReduces)});
|
|
|
}
|
|
@@ -482,7 +459,7 @@ public class JobHistory {
|
|
|
PrintWriter writer = openJobs.get(logFileName);
|
|
|
if (null != writer){
|
|
|
JobHistory.log(writer, RecordTypes.Job,
|
|
|
- new Keys[] {Keys.JOBID, Keys.FINISH_TIME, Keys.JOB_STATUS, Keys.FINISHED_MAPS, Keys.FINISHED_REDUCES },
|
|
|
+ new Enum[] {Keys.JOBID, Keys.FINISH_TIME, Keys.JOB_STATUS, Keys.FINISHED_MAPS, Keys.FINISHED_REDUCES },
|
|
|
new String[] {jobid, String.valueOf(timestamp), Values.FAILED.name(), String.valueOf(finishedMaps),
|
|
|
String.valueOf(finishedReduces)});
|
|
|
writer.close();
|
|
@@ -511,8 +488,7 @@ public class JobHistory {
|
|
|
if (!disableHistory){
|
|
|
PrintWriter writer = openJobs.get(JOBTRACKER_START_TIME + "_" + jobId);
|
|
|
if (null != writer){
|
|
|
- JobHistory.log(writer, RecordTypes.Task,
|
|
|
- new Keys[]{Keys.TASKID, Keys.TASK_TYPE , Keys.START_TIME},
|
|
|
+ JobHistory.log(writer, RecordTypes.Task, new Enum[]{Keys.TASKID, Keys.TASK_TYPE , Keys.START_TIME},
|
|
|
new String[]{taskId, taskType, String.valueOf(startTime)});
|
|
|
}
|
|
|
}
|
|
@@ -525,17 +501,13 @@ public class JobHistory {
|
|
|
* @param finishTime finish timeof task in ms
|
|
|
*/
|
|
|
public static void logFinished(String jobId, String taskId, String taskType,
|
|
|
- long finishTime, Counters counters){
|
|
|
+ long finishTime){
|
|
|
if (!disableHistory){
|
|
|
PrintWriter writer = openJobs.get(JOBTRACKER_START_TIME + "_" + jobId);
|
|
|
if (null != writer){
|
|
|
- JobHistory.log(writer, RecordTypes.Task,
|
|
|
- new Keys[]{Keys.TASKID, Keys.TASK_TYPE,
|
|
|
- Keys.TASK_STATUS, Keys.FINISH_TIME,
|
|
|
- Keys.COUNTERS},
|
|
|
- new String[]{ taskId, taskType, Values.SUCCESS.name(),
|
|
|
- String.valueOf(finishTime),
|
|
|
- stringifyCounters(counters)});
|
|
|
+ JobHistory.log(writer, RecordTypes.Task, new Enum[]{Keys.TASKID, Keys.TASK_TYPE,
|
|
|
+ Keys.TASK_STATUS, Keys.FINISH_TIME},
|
|
|
+ new String[]{ taskId, taskType, Values.SUCCESS.name(), String.valueOf(finishTime)});
|
|
|
}
|
|
|
}
|
|
|
}
|
|
@@ -551,9 +523,8 @@ public class JobHistory {
|
|
|
if (!disableHistory){
|
|
|
PrintWriter writer = openJobs.get(JOBTRACKER_START_TIME + "_" + jobId);
|
|
|
if (null != writer){
|
|
|
- JobHistory.log(writer, RecordTypes.Task,
|
|
|
- new Keys[]{Keys.TASKID, Keys.TASK_TYPE,
|
|
|
- Keys.TASK_STATUS, Keys.FINISH_TIME, Keys.ERROR},
|
|
|
+ JobHistory.log(writer, RecordTypes.Task, new Enum[]{Keys.TASKID, Keys.TASK_TYPE,
|
|
|
+ Keys.TASK_STATUS, Keys.FINISH_TIME, Keys.ERROR},
|
|
|
new String[]{ taskId, taskType, Values.FAILED.name(), String.valueOf(time) , error});
|
|
|
}
|
|
|
}
|
|
@@ -589,9 +560,8 @@ public class JobHistory {
|
|
|
PrintWriter writer = openJobs.get(JOBTRACKER_START_TIME + "_" + jobId);
|
|
|
if (null != writer){
|
|
|
JobHistory.log(writer, RecordTypes.MapAttempt,
|
|
|
- new Keys[]{ Keys.TASK_TYPE, Keys.TASKID,
|
|
|
- Keys.TASK_ATTEMPT_ID, Keys.START_TIME,
|
|
|
- Keys.HOSTNAME},
|
|
|
+ new Enum[]{ Keys.TASK_TYPE, Keys.TASKID,
|
|
|
+ Keys.TASK_ATTEMPT_ID, Keys.START_TIME, Keys.HOSTNAME},
|
|
|
new String[]{Values.MAP.name(), taskId,
|
|
|
taskAttemptId, String.valueOf(startTime), hostName});
|
|
|
}
|
|
@@ -605,22 +575,18 @@ public class JobHistory {
|
|
|
* @param finishTime finish time
|
|
|
* @param hostName host name
|
|
|
*/
|
|
|
- public static void logFinished(String jobId, String taskId,
|
|
|
- String taskAttemptId, long finishTime,
|
|
|
- String hostName){
|
|
|
+ public static void logFinished(String jobId, String taskId, String taskAttemptId, long finishTime, String hostName){
|
|
|
if (!disableHistory){
|
|
|
PrintWriter writer = openJobs.get(JOBTRACKER_START_TIME + "_" + jobId);
|
|
|
if (null != writer){
|
|
|
JobHistory.log(writer, RecordTypes.MapAttempt,
|
|
|
- new Keys[]{ Keys.TASK_TYPE, Keys.TASKID,
|
|
|
- Keys.TASK_ATTEMPT_ID, Keys.TASK_STATUS,
|
|
|
+ new Enum[]{ Keys.TASK_TYPE, Keys.TASKID, Keys.TASK_ATTEMPT_ID, Keys.TASK_STATUS,
|
|
|
Keys.FINISH_TIME, Keys.HOSTNAME},
|
|
|
new String[]{Values.MAP.name(), taskId, taskAttemptId, Values.SUCCESS.name(),
|
|
|
String.valueOf(finishTime), hostName});
|
|
|
}
|
|
|
}
|
|
|
}
|
|
|
-
|
|
|
/**
|
|
|
* Log task attempt failed event.
|
|
|
* @param jobId jobid
|
|
@@ -636,7 +602,7 @@ public class JobHistory {
|
|
|
PrintWriter writer = openJobs.get(JOBTRACKER_START_TIME + "_" + jobId);
|
|
|
if (null != writer){
|
|
|
JobHistory.log(writer, RecordTypes.MapAttempt,
|
|
|
- new Keys[]{Keys.TASK_TYPE, Keys.TASKID, Keys.TASK_ATTEMPT_ID, Keys.TASK_STATUS,
|
|
|
+ new Enum[]{Keys.TASK_TYPE, Keys.TASKID, Keys.TASK_ATTEMPT_ID, Keys.TASK_STATUS,
|
|
|
Keys.FINISH_TIME, Keys.HOSTNAME, Keys.ERROR},
|
|
|
new String[]{ Values.MAP.name(), taskId, taskAttemptId, Values.FAILED.name(),
|
|
|
String.valueOf(timestamp), hostName, error});
|
|
@@ -658,7 +624,7 @@ public class JobHistory {
|
|
|
PrintWriter writer = openJobs.get(JOBTRACKER_START_TIME + "_" + jobId);
|
|
|
if (null != writer){
|
|
|
JobHistory.log(writer, RecordTypes.MapAttempt,
|
|
|
- new Keys[]{Keys.TASK_TYPE, Keys.TASKID, Keys.TASK_ATTEMPT_ID, Keys.TASK_STATUS,
|
|
|
+ new Enum[]{Keys.TASK_TYPE, Keys.TASKID, Keys.TASK_ATTEMPT_ID, Keys.TASK_STATUS,
|
|
|
Keys.FINISH_TIME, Keys.HOSTNAME, Keys.ERROR},
|
|
|
new String[]{ Values.MAP.name(), taskId, taskAttemptId, Values.KILLED.name(),
|
|
|
String.valueOf(timestamp), hostName, error});
|
|
@@ -685,7 +651,7 @@ public class JobHistory {
|
|
|
PrintWriter writer = openJobs.get(JOBTRACKER_START_TIME + "_" + jobId);
|
|
|
if (null != writer){
|
|
|
JobHistory.log(writer, RecordTypes.ReduceAttempt,
|
|
|
- new Keys[]{ Keys.TASK_TYPE, Keys.TASKID,
|
|
|
+ new Enum[]{ Keys.TASK_TYPE, Keys.TASKID,
|
|
|
Keys.TASK_ATTEMPT_ID, Keys.START_TIME, Keys.HOSTNAME},
|
|
|
new String[]{Values.REDUCE.name(), taskId,
|
|
|
taskAttemptId, String.valueOf(startTime), hostName});
|
|
@@ -702,18 +668,14 @@ public class JobHistory {
|
|
|
* @param finishTime finish time of task
|
|
|
* @param hostName host name where task attempt executed
|
|
|
*/
|
|
|
- public static void logFinished(String jobId, String taskId,
|
|
|
- String taskAttemptId, long shuffleFinished,
|
|
|
- long sortFinished, long finishTime,
|
|
|
- String hostName){
|
|
|
+ public static void logFinished(String jobId, String taskId, String taskAttemptId,
|
|
|
+ long shuffleFinished, long sortFinished, long finishTime, String hostName){
|
|
|
if (!disableHistory){
|
|
|
PrintWriter writer = openJobs.get(JOBTRACKER_START_TIME + "_" + jobId);
|
|
|
if (null != writer){
|
|
|
JobHistory.log(writer, RecordTypes.ReduceAttempt,
|
|
|
- new Keys[]{ Keys.TASK_TYPE, Keys.TASKID,
|
|
|
- Keys.TASK_ATTEMPT_ID, Keys.TASK_STATUS,
|
|
|
- Keys.SHUFFLE_FINISHED, Keys.SORT_FINISHED,
|
|
|
- Keys.FINISH_TIME, Keys.HOSTNAME},
|
|
|
+ new Enum[]{ Keys.TASK_TYPE, Keys.TASKID, Keys.TASK_ATTEMPT_ID, Keys.TASK_STATUS,
|
|
|
+ Keys.SHUFFLE_FINISHED, Keys.SORT_FINISHED, Keys.FINISH_TIME, Keys.HOSTNAME},
|
|
|
new String[]{Values.REDUCE.name(), taskId, taskAttemptId, Values.SUCCESS.name(),
|
|
|
String.valueOf(shuffleFinished), String.valueOf(sortFinished),
|
|
|
String.valueOf(finishTime), hostName});
|
|
@@ -735,7 +697,7 @@ public class JobHistory {
|
|
|
PrintWriter writer = openJobs.get(JOBTRACKER_START_TIME + "_" + jobId);
|
|
|
if (null != writer){
|
|
|
JobHistory.log(writer, RecordTypes.ReduceAttempt,
|
|
|
- new Keys[]{ Keys.TASK_TYPE, Keys.TASKID, Keys.TASK_ATTEMPT_ID, Keys.TASK_STATUS,
|
|
|
+ new Enum[]{ Keys.TASK_TYPE, Keys.TASKID, Keys.TASK_ATTEMPT_ID, Keys.TASK_STATUS,
|
|
|
Keys.FINISH_TIME, Keys.HOSTNAME, Keys.ERROR },
|
|
|
new String[]{ Values.REDUCE.name(), taskId, taskAttemptId, Values.FAILED.name(),
|
|
|
String.valueOf(timestamp), hostName, error });
|
|
@@ -757,10 +719,8 @@ public class JobHistory {
|
|
|
PrintWriter writer = openJobs.get(JOBTRACKER_START_TIME + "_" + jobId);
|
|
|
if (null != writer){
|
|
|
JobHistory.log(writer, RecordTypes.ReduceAttempt,
|
|
|
- new Keys[]{ Keys.TASK_TYPE, Keys.TASKID,
|
|
|
- Keys.TASK_ATTEMPT_ID, Keys.TASK_STATUS,
|
|
|
- Keys.FINISH_TIME, Keys.HOSTNAME,
|
|
|
- Keys.ERROR },
|
|
|
+ new Enum[]{ Keys.TASK_TYPE, Keys.TASKID, Keys.TASK_ATTEMPT_ID, Keys.TASK_STATUS,
|
|
|
+ Keys.FINISH_TIME, Keys.HOSTNAME, Keys.ERROR },
|
|
|
new String[]{ Values.REDUCE.name(), taskId, taskAttemptId, Values.KILLED.name(),
|
|
|
String.valueOf(timestamp), hostName, error });
|
|
|
}
|
|
@@ -768,33 +728,6 @@ public class JobHistory {
|
|
|
}
|
|
|
|
|
|
}
|
|
|
-
|
|
|
- /**
|
|
|
- * Convert a counters object into a string
|
|
|
- * @param counters the counters to stringify
|
|
|
- * @return the resulting string
|
|
|
- */
|
|
|
- private static String stringifyCounters(Counters counters) {
|
|
|
- StringBuffer buffer = new StringBuffer();
|
|
|
- for(String groupName: counters.getGroupNames()){
|
|
|
- Counters.Group group = counters.getGroup(groupName);
|
|
|
- boolean first = true;
|
|
|
- for(String counterName: group.getCounterNames()) {
|
|
|
- if (first) {
|
|
|
- first = false;
|
|
|
- } else {
|
|
|
- buffer.append(',');
|
|
|
- }
|
|
|
- buffer.append(groupName);
|
|
|
- buffer.append('.');
|
|
|
- buffer.append(counterName);
|
|
|
- buffer.append('=');
|
|
|
- buffer.append(group.getCounter(counterName));
|
|
|
- }
|
|
|
- }
|
|
|
- return buffer.toString();
|
|
|
- }
|
|
|
-
|
|
|
/**
|
|
|
* Callback interface for reading back log events from JobHistory. This interface
|
|
|
* should be implemented and passed to JobHistory.parseHistory()
|
|
@@ -849,21 +782,16 @@ public class JobHistory {
|
|
|
// find job that started more than one month back and remove them
|
|
|
// for jobtracker instances which dont have a job in past one month
|
|
|
// remove the jobtracker start timestamp as well.
|
|
|
- Iterator<Map<String, JobHistory.JobInfo>> jobTrackerItr =
|
|
|
- jobTrackersToJobs.values().iterator();
|
|
|
- while (jobTrackerItr.hasNext()) {
|
|
|
- Map<String, JobHistory.JobInfo> jobs = jobTrackerItr.next();
|
|
|
- Iterator<Map.Entry<String, JobHistory.JobInfo>> jobItr =
|
|
|
- jobs.entrySet().iterator();
|
|
|
- while (jobItr.hasNext()) {
|
|
|
- Map.Entry<String, JobHistory.JobInfo> item = jobItr.next();
|
|
|
- if (now - item.getValue().getLong(Keys.SUBMIT_TIME) >
|
|
|
- THIRTY_DAYS_IN_MS) {
|
|
|
- jobItr.remove();
|
|
|
+ for (Map<String, JobHistory.JobInfo> jobs :
|
|
|
+ jobTrackersToJobs.values()) {
|
|
|
+ for(Iterator iter = jobs.keySet().iterator(); iter.hasNext(); iter.next()){
|
|
|
+ JobHistory.JobInfo job = jobs.get(iter.next());
|
|
|
+ if (now - job.getLong(Keys.SUBMIT_TIME) > THIRTY_DAYS_IN_MS) {
|
|
|
+ iter.remove();
|
|
|
+ }
|
|
|
+ if (jobs.size() == 0){
|
|
|
+ iter.remove();
|
|
|
}
|
|
|
- }
|
|
|
- if (jobs.size() == 0){
|
|
|
- jobTrackerItr.remove();
|
|
|
}
|
|
|
}
|
|
|
masterIndex.close();
|