|
@@ -60,10 +60,10 @@ public class JobHistory {
|
|
|
* It acts as a global namespace for all keys.
|
|
|
*/
|
|
|
public static enum Keys { JOBTRACKERID,
|
|
|
- START_TIME, FINISH_TIME, JOBID, JOBNAME, USER, JOBCONF,SUBMIT_TIME, LAUNCH_TIME,
|
|
|
- TOTAL_MAPS, TOTAL_REDUCES, FAILED_MAPS, FAILED_REDUCES, FINISHED_MAPS, FINISHED_REDUCES,
|
|
|
- JOB_STATUS, TASKID, HOSTNAME, TASK_TYPE, ERROR, TASK_ATTEMPT_ID, TASK_STATUS,
|
|
|
- COPY_PHASE, SORT_PHASE, REDUCE_PHASE, SHUFFLE_FINISHED, SORT_FINISHED
|
|
|
+ START_TIME, FINISH_TIME, JOBID, JOBNAME, USER, JOBCONF,SUBMIT_TIME, LAUNCH_TIME,
|
|
|
+ TOTAL_MAPS, TOTAL_REDUCES, FAILED_MAPS, FAILED_REDUCES, FINISHED_MAPS, FINISHED_REDUCES,
|
|
|
+ JOB_STATUS, TASKID, HOSTNAME, TASK_TYPE, ERROR, TASK_ATTEMPT_ID, TASK_STATUS,
|
|
|
+ COPY_PHASE, SORT_PHASE, REDUCE_PHASE, SHUFFLE_FINISHED, SORT_FINISHED
|
|
|
};
|
|
|
/**
|
|
|
* This enum contains some of the values commonly used by history log events.
|
|
@@ -94,7 +94,7 @@ public class JobHistory {
|
|
|
}
|
|
|
masterIndex =
|
|
|
new PrintWriter(
|
|
|
- new FileOutputStream(new File( LOG_DIR + File.separator + MASTER_INDEX_LOG_FILE), true )) ;
|
|
|
+ new FileOutputStream(new File( LOG_DIR + File.separator + MASTER_INDEX_LOG_FILE), true )) ;
|
|
|
// add jobtracker id = tracker start time
|
|
|
log(masterIndex, RecordTypes.Jobtracker, Keys.START_TIME, JOBTRACKER_START_TIME);
|
|
|
}catch(IOException e){
|
|
@@ -114,17 +114,17 @@ public class JobHistory {
|
|
|
* @throws IOException
|
|
|
*/
|
|
|
public static void parseHistory(File path, Listener l) throws IOException{
|
|
|
- BufferedReader reader = new BufferedReader(new FileReader(path));
|
|
|
- String line = null ;
|
|
|
- StringBuffer buf = new StringBuffer();
|
|
|
- while ((line = reader.readLine())!= null){
|
|
|
- buf.append(line);
|
|
|
- if( ! line.trim().endsWith("\"")){
|
|
|
- continue ;
|
|
|
- }
|
|
|
- parseLine(buf.toString(), l );
|
|
|
- buf = new StringBuffer();
|
|
|
+ BufferedReader reader = new BufferedReader(new FileReader(path));
|
|
|
+ String line = null ;
|
|
|
+ StringBuffer buf = new StringBuffer();
|
|
|
+ while ((line = reader.readLine())!= null){
|
|
|
+ buf.append(line);
|
|
|
+ if( ! line.trim().endsWith("\"")){
|
|
|
+ continue ;
|
|
|
}
|
|
|
+ parseLine(buf.toString(), l );
|
|
|
+ buf = new StringBuffer();
|
|
|
+ }
|
|
|
}
|
|
|
/**
|
|
|
* Parse a single line of history.
|
|
@@ -305,13 +305,13 @@ public class JobHistory {
|
|
|
* @param jobConf path to job conf xml file in HDFS.
|
|
|
*/
|
|
|
public static void logSubmitted(String jobId, String jobName, String user,
|
|
|
- long submitTime, String jobConf){
|
|
|
+ long submitTime, String jobConf){
|
|
|
|
|
|
if( ! disableHistory ){
|
|
|
synchronized(MASTER_INDEX_LOG_FILE){
|
|
|
JobHistory.log(masterIndex, RecordTypes.Job,
|
|
|
- new Enum[]{Keys.JOBID, Keys.JOBNAME, Keys.USER, Keys.SUBMIT_TIME, Keys.JOBCONF },
|
|
|
- new String[]{jobId, jobName, user, String.valueOf(submitTime),jobConf });
|
|
|
+ new Enum[]{Keys.JOBID, Keys.JOBNAME, Keys.USER, Keys.SUBMIT_TIME, Keys.JOBCONF },
|
|
|
+ new String[]{jobId, jobName, user, String.valueOf(submitTime),jobConf });
|
|
|
}
|
|
|
// setup the history log file for this job
|
|
|
String logFileName = JOBTRACKER_START_TIME + "_" + jobId ;
|
|
@@ -322,8 +322,8 @@ public class JobHistory {
|
|
|
openJobs.put(logFileName, writer);
|
|
|
// add to writer as well
|
|
|
JobHistory.log(writer, RecordTypes.Job,
|
|
|
- new Enum[]{Keys.JOBID, Keys.JOBNAME, Keys.USER, Keys.SUBMIT_TIME, Keys.JOBCONF },
|
|
|
- new String[]{jobId, jobName, user, String.valueOf(submitTime) ,jobConf});
|
|
|
+ new Enum[]{Keys.JOBID, Keys.JOBNAME, Keys.USER, Keys.SUBMIT_TIME, Keys.JOBCONF },
|
|
|
+ new String[]{jobId, jobName, user, String.valueOf(submitTime) ,jobConf});
|
|
|
|
|
|
}catch(IOException e){
|
|
|
LOG.error("Failed creating job history log file, disabling history", e);
|
|
@@ -342,9 +342,9 @@ public class JobHistory {
|
|
|
if( ! disableHistory ){
|
|
|
synchronized(MASTER_INDEX_LOG_FILE){
|
|
|
JobHistory.log(masterIndex, RecordTypes.Job,
|
|
|
- new Enum[] {Keys.JOBID, Keys.LAUNCH_TIME, Keys.TOTAL_MAPS, Keys.TOTAL_REDUCES },
|
|
|
- new String[] {jobId, String.valueOf(startTime),
|
|
|
- String.valueOf(totalMaps), String.valueOf(totalReduces) } ) ;
|
|
|
+ new Enum[] {Keys.JOBID, Keys.LAUNCH_TIME, Keys.TOTAL_MAPS, Keys.TOTAL_REDUCES },
|
|
|
+ new String[] {jobId, String.valueOf(startTime),
|
|
|
+ String.valueOf(totalMaps), String.valueOf(totalReduces) } ) ;
|
|
|
}
|
|
|
|
|
|
String logFileName = JOBTRACKER_START_TIME + "_" + jobId ;
|
|
@@ -352,8 +352,8 @@ public class JobHistory {
|
|
|
|
|
|
if( null != writer ){
|
|
|
JobHistory.log(writer, RecordTypes.Job,
|
|
|
- new Enum[] {Keys.JOBID, Keys.LAUNCH_TIME,Keys.TOTAL_MAPS, Keys.TOTAL_REDUCES },
|
|
|
- new String[] {jobId, String.valueOf(startTime), String.valueOf(totalMaps), String.valueOf(totalReduces)} ) ;
|
|
|
+ new Enum[] {Keys.JOBID, Keys.LAUNCH_TIME,Keys.TOTAL_MAPS, Keys.TOTAL_REDUCES },
|
|
|
+ new String[] {jobId, String.valueOf(startTime), String.valueOf(totalMaps), String.valueOf(totalReduces)} ) ;
|
|
|
}
|
|
|
}
|
|
|
}
|
|
@@ -367,13 +367,13 @@ public class JobHistory {
|
|
|
* @param failedReduces no of failed reduce tasks.
|
|
|
*/
|
|
|
public static void logFinished(String jobId, long finishTime, int finishedMaps, int finishedReduces,
|
|
|
- int failedMaps, int failedReduces){
|
|
|
+ int failedMaps, int failedReduces){
|
|
|
if( ! disableHistory ){
|
|
|
synchronized(MASTER_INDEX_LOG_FILE){
|
|
|
JobHistory.log(masterIndex, RecordTypes.Job,
|
|
|
- new Enum[] {Keys.JOBID, Keys.FINISH_TIME, Keys.JOB_STATUS, Keys.FINISHED_MAPS, Keys.FINISHED_REDUCES },
|
|
|
- new String[] {jobId, "" + finishTime, Values.SUCCESS.name(),
|
|
|
- String.valueOf(finishedMaps), String.valueOf(finishedReduces) } ) ;
|
|
|
+ new Enum[] {Keys.JOBID, Keys.FINISH_TIME, Keys.JOB_STATUS, Keys.FINISHED_MAPS, Keys.FINISHED_REDUCES },
|
|
|
+ new String[] {jobId, "" + finishTime, Values.SUCCESS.name(),
|
|
|
+ String.valueOf(finishedMaps), String.valueOf(finishedReduces) } ) ;
|
|
|
}
|
|
|
|
|
|
// close job file for this job
|
|
@@ -381,11 +381,11 @@ public class JobHistory {
|
|
|
PrintWriter writer = openJobs.get(logFileName);
|
|
|
if( null != writer){
|
|
|
JobHistory.log(writer, RecordTypes.Job,
|
|
|
- new Enum[] {Keys.JOBID, Keys.FINISH_TIME, Keys.JOB_STATUS, Keys.FINISHED_MAPS, Keys.FINISHED_REDUCES,
|
|
|
- Keys.FAILED_MAPS, Keys.FAILED_REDUCES},
|
|
|
- new String[] {jobId, "" + finishTime, Values.SUCCESS.name(),
|
|
|
- String.valueOf(finishedMaps), String.valueOf(finishedReduces),
|
|
|
- String.valueOf(failedMaps), String.valueOf(failedReduces)} ) ;
|
|
|
+ new Enum[] {Keys.JOBID, Keys.FINISH_TIME, Keys.JOB_STATUS, Keys.FINISHED_MAPS, Keys.FINISHED_REDUCES,
|
|
|
+ Keys.FAILED_MAPS, Keys.FAILED_REDUCES},
|
|
|
+ new String[] {jobId, "" + finishTime, Values.SUCCESS.name(),
|
|
|
+ String.valueOf(finishedMaps), String.valueOf(finishedReduces),
|
|
|
+ String.valueOf(failedMaps), String.valueOf(failedReduces)} ) ;
|
|
|
writer.close();
|
|
|
openJobs.remove(logFileName);
|
|
|
}
|
|
@@ -404,20 +404,20 @@ public class JobHistory {
|
|
|
if( ! disableHistory ){
|
|
|
synchronized(MASTER_INDEX_LOG_FILE){
|
|
|
JobHistory.log(masterIndex, RecordTypes.Job,
|
|
|
- new Enum[] {Keys.JOBID, Keys.FINISH_TIME, Keys.JOB_STATUS, Keys.FINISHED_MAPS, Keys.FINISHED_REDUCES },
|
|
|
- new String[] {jobid, String.valueOf(timestamp), Values.FAILED.name(), String.valueOf(finishedMaps),
|
|
|
- String.valueOf(finishedReduces)} ) ;
|
|
|
+ new Enum[] {Keys.JOBID, Keys.FINISH_TIME, Keys.JOB_STATUS, Keys.FINISHED_MAPS, Keys.FINISHED_REDUCES },
|
|
|
+ new String[] {jobid, String.valueOf(timestamp), Values.FAILED.name(), String.valueOf(finishedMaps),
|
|
|
+ String.valueOf(finishedReduces)} ) ;
|
|
|
+ }
|
|
|
+ String logFileName = JOBTRACKER_START_TIME + "_" + jobid ;
|
|
|
+ PrintWriter writer = (PrintWriter)openJobs.get(logFileName);
|
|
|
+ if( null != writer){
|
|
|
+ JobHistory.log(writer, RecordTypes.Job,
|
|
|
+ new Enum[] {Keys.JOBID, Keys.FINISH_TIME, Keys.JOB_STATUS,Keys.FINISHED_MAPS, Keys.FINISHED_REDUCES },
|
|
|
+ new String[] {jobid, String.valueOf(timestamp), Values.FAILED.name(), String.valueOf(finishedMaps),
|
|
|
+ String.valueOf(finishedReduces)} ) ;
|
|
|
+ writer.close();
|
|
|
+ openJobs.remove(logFileName);
|
|
|
}
|
|
|
- String logFileName = JOBTRACKER_START_TIME + "_" + jobid ;
|
|
|
- PrintWriter writer = (PrintWriter)openJobs.get(logFileName);
|
|
|
- if( null != writer){
|
|
|
- JobHistory.log(writer, RecordTypes.Job,
|
|
|
- new Enum[] {Keys.JOBID, Keys.FINISH_TIME, Keys.JOB_STATUS,Keys.FINISHED_MAPS, Keys.FINISHED_REDUCES },
|
|
|
- new String[] {jobid, String.valueOf(timestamp), Values.FAILED.name(), String.valueOf(finishedMaps),
|
|
|
- String.valueOf(finishedReduces)} ) ;
|
|
|
- writer.close();
|
|
|
- openJobs.remove(logFileName);
|
|
|
- }
|
|
|
}
|
|
|
}
|
|
|
}
|
|
@@ -437,12 +437,12 @@ public class JobHistory {
|
|
|
* @param startTime startTime of tip.
|
|
|
*/
|
|
|
public static void logStarted(String jobId, String taskId, String taskType,
|
|
|
- long startTime){
|
|
|
+ long startTime){
|
|
|
if( ! disableHistory ){
|
|
|
PrintWriter writer = (PrintWriter)openJobs.get(JOBTRACKER_START_TIME + "_" + jobId);
|
|
|
if( null != writer ){
|
|
|
JobHistory.log(writer, RecordTypes.Task, new Enum[]{Keys.TASKID, Keys.TASK_TYPE , Keys.START_TIME},
|
|
|
- new String[]{taskId, taskType, String.valueOf(startTime)}) ;
|
|
|
+ new String[]{taskId, taskType, String.valueOf(startTime)}) ;
|
|
|
}
|
|
|
}
|
|
|
}
|
|
@@ -454,13 +454,13 @@ public class JobHistory {
|
|
|
* @param finishTime finish timeof task in ms
|
|
|
*/
|
|
|
public static void logFinished(String jobId, String taskId, String taskType,
|
|
|
- long finishTime){
|
|
|
+ long finishTime){
|
|
|
if( ! disableHistory ){
|
|
|
PrintWriter writer = (PrintWriter)openJobs.get(JOBTRACKER_START_TIME + "_" + jobId);
|
|
|
if( null != writer ){
|
|
|
JobHistory.log(writer, RecordTypes.Task, new Enum[]{Keys.TASKID, Keys.TASK_TYPE,
|
|
|
- Keys.TASK_STATUS, Keys.FINISH_TIME},
|
|
|
- new String[]{ taskId,taskType, Values.SUCCESS.name(), String.valueOf(finishTime)}) ;
|
|
|
+ Keys.TASK_STATUS, Keys.FINISH_TIME},
|
|
|
+ new String[]{ taskId,taskType, Values.SUCCESS.name(), String.valueOf(finishTime)}) ;
|
|
|
}
|
|
|
}
|
|
|
}
|
|
@@ -477,8 +477,8 @@ public class JobHistory {
|
|
|
PrintWriter writer = (PrintWriter)openJobs.get(JOBTRACKER_START_TIME + "_" + jobId);
|
|
|
if( null != writer ){
|
|
|
JobHistory.log(writer, RecordTypes.Task, new Enum[]{Keys.TASKID, Keys.TASK_TYPE,
|
|
|
- Keys.TASK_STATUS, Keys.FINISH_TIME, Keys.ERROR},
|
|
|
- new String[]{ taskId, taskType, Values.FAILED.name(), String.valueOf(time) , error}) ;
|
|
|
+ Keys.TASK_STATUS, Keys.FINISH_TIME, Keys.ERROR},
|
|
|
+ new String[]{ taskId, taskType, Values.FAILED.name(), String.valueOf(time) , error}) ;
|
|
|
}
|
|
|
}
|
|
|
}
|
|
@@ -500,43 +500,43 @@ public class JobHistory {
|
|
|
* a Map Attempt on a node.
|
|
|
*/
|
|
|
public static class MapAttempt extends TaskAttempt{
|
|
|
- /**
|
|
|
- * Log start time of this map task attempt.
|
|
|
- * @param jobId job id
|
|
|
- * @param taskId task id
|
|
|
- * @param taskAttemptId task attempt id
|
|
|
- * @param startTime start time of task attempt as reported by task tracker.
|
|
|
- * @param hostName host name of the task attempt.
|
|
|
- */
|
|
|
- public static void logStarted(String jobId, String taskId,String taskAttemptId, long startTime, String hostName){
|
|
|
- if( ! disableHistory ){
|
|
|
- PrintWriter writer = (PrintWriter)openJobs.get(JOBTRACKER_START_TIME + "_" + jobId);
|
|
|
- if( null != writer ){
|
|
|
- JobHistory.log( writer, RecordTypes.MapAttempt,
|
|
|
- new Enum[]{ Keys.TASK_TYPE, Keys.TASKID,
|
|
|
- Keys.TASK_ATTEMPT_ID, Keys.START_TIME, Keys.HOSTNAME},
|
|
|
- new String[]{Values.MAP.name(), taskId,
|
|
|
- taskAttemptId, String.valueOf(startTime), hostName} ) ;
|
|
|
- }
|
|
|
+ /**
|
|
|
+ * Log start time of this map task attempt.
|
|
|
+ * @param jobId job id
|
|
|
+ * @param taskId task id
|
|
|
+ * @param taskAttemptId task attempt id
|
|
|
+ * @param startTime start time of task attempt as reported by task tracker.
|
|
|
+ * @param hostName host name of the task attempt.
|
|
|
+ */
|
|
|
+ public static void logStarted(String jobId, String taskId,String taskAttemptId, long startTime, String hostName){
|
|
|
+ if( ! disableHistory ){
|
|
|
+ PrintWriter writer = (PrintWriter)openJobs.get(JOBTRACKER_START_TIME + "_" + jobId);
|
|
|
+ if( null != writer ){
|
|
|
+ JobHistory.log( writer, RecordTypes.MapAttempt,
|
|
|
+ new Enum[]{ Keys.TASK_TYPE, Keys.TASKID,
|
|
|
+ Keys.TASK_ATTEMPT_ID, Keys.START_TIME, Keys.HOSTNAME},
|
|
|
+ new String[]{Values.MAP.name(), taskId,
|
|
|
+ taskAttemptId, String.valueOf(startTime), hostName} ) ;
|
|
|
+ }
|
|
|
}
|
|
|
}
|
|
|
- /**
|
|
|
- * Log finish time of map task attempt.
|
|
|
- * @param jobId job id
|
|
|
- * @param taskId task id
|
|
|
- * @param taskAttemptId task attempt id
|
|
|
- * @param finishTime finish time
|
|
|
- * @param hostName host name
|
|
|
- */
|
|
|
+ /**
|
|
|
+ * Log finish time of map task attempt.
|
|
|
+ * @param jobId job id
|
|
|
+ * @param taskId task id
|
|
|
+ * @param taskAttemptId task attempt id
|
|
|
+ * @param finishTime finish time
|
|
|
+ * @param hostName host name
|
|
|
+ */
|
|
|
public static void logFinished(String jobId, String taskId, String taskAttemptId, long finishTime, String hostName){
|
|
|
if( ! disableHistory ){
|
|
|
PrintWriter writer = (PrintWriter)openJobs.get(JOBTRACKER_START_TIME + "_" + jobId);
|
|
|
if( null != writer ){
|
|
|
JobHistory.log(writer, RecordTypes.MapAttempt,
|
|
|
- new Enum[]{ Keys.TASK_TYPE, Keys.TASKID, Keys.TASK_ATTEMPT_ID, Keys.TASK_STATUS,
|
|
|
- Keys.FINISH_TIME, Keys.HOSTNAME},
|
|
|
- new String[]{Values.MAP.name(), taskId, taskAttemptId, Values.SUCCESS.name(),
|
|
|
- String.valueOf(finishTime), hostName} ) ;
|
|
|
+ new Enum[]{ Keys.TASK_TYPE, Keys.TASKID, Keys.TASK_ATTEMPT_ID, Keys.TASK_STATUS,
|
|
|
+ Keys.FINISH_TIME, Keys.HOSTNAME},
|
|
|
+ new String[]{Values.MAP.name(), taskId, taskAttemptId, Values.SUCCESS.name(),
|
|
|
+ String.valueOf(finishTime), hostName} ) ;
|
|
|
}
|
|
|
}
|
|
|
}
|
|
@@ -550,15 +550,15 @@ public class JobHistory {
|
|
|
* @param error error message if any for this task attempt.
|
|
|
*/
|
|
|
public static void logFailed(String jobId, String taskId, String taskAttemptId,
|
|
|
- long timestamp, String hostName, String error){
|
|
|
+ long timestamp, String hostName, String error){
|
|
|
if( ! disableHistory ){
|
|
|
PrintWriter writer = (PrintWriter)openJobs.get(JOBTRACKER_START_TIME + "_" + jobId);
|
|
|
if( null != writer ){
|
|
|
JobHistory.log( writer, RecordTypes.MapAttempt,
|
|
|
- new Enum[]{Keys.TASK_TYPE, Keys.TASKID, Keys.TASK_ATTEMPT_ID, Keys.TASK_STATUS,
|
|
|
- Keys.FINISH_TIME, Keys.HOSTNAME, Keys.ERROR},
|
|
|
- new String[]{ Values.MAP.name(), taskId, taskAttemptId, Values.FAILED.name(),
|
|
|
- String.valueOf(timestamp), hostName, error} ) ;
|
|
|
+ new Enum[]{Keys.TASK_TYPE, Keys.TASKID, Keys.TASK_ATTEMPT_ID, Keys.TASK_STATUS,
|
|
|
+ Keys.FINISH_TIME, Keys.HOSTNAME, Keys.ERROR},
|
|
|
+ new String[]{ Values.MAP.name(), taskId, taskAttemptId, Values.FAILED.name(),
|
|
|
+ String.valueOf(timestamp), hostName, error} ) ;
|
|
|
}
|
|
|
}
|
|
|
}
|
|
@@ -577,18 +577,18 @@ public class JobHistory {
|
|
|
* @param hostName host name
|
|
|
*/
|
|
|
public static void logStarted(String jobId, String taskId, String taskAttemptId,
|
|
|
- long startTime, String hostName){
|
|
|
+ long startTime, String hostName){
|
|
|
if( ! disableHistory ){
|
|
|
PrintWriter writer = (PrintWriter)openJobs.get(JOBTRACKER_START_TIME + "_" + jobId);
|
|
|
if( null != writer ){
|
|
|
JobHistory.log( writer, RecordTypes.ReduceAttempt,
|
|
|
- new Enum[]{ Keys.TASK_TYPE, Keys.TASKID,
|
|
|
- Keys.TASK_ATTEMPT_ID, Keys.START_TIME, Keys.HOSTNAME},
|
|
|
- new String[]{Values.REDUCE.name(), taskId,
|
|
|
- taskAttemptId, String.valueOf(startTime), hostName} ) ;
|
|
|
+ new Enum[]{ Keys.TASK_TYPE, Keys.TASKID,
|
|
|
+ Keys.TASK_ATTEMPT_ID, Keys.START_TIME, Keys.HOSTNAME},
|
|
|
+ new String[]{Values.REDUCE.name(), taskId,
|
|
|
+ taskAttemptId, String.valueOf(startTime), hostName} ) ;
|
|
|
}
|
|
|
}
|
|
|
- }
|
|
|
+ }
|
|
|
/**
|
|
|
* Log finished event of this task.
|
|
|
* @param jobId job id
|
|
@@ -599,42 +599,42 @@ public class JobHistory {
|
|
|
* @param finishTime finish time of task
|
|
|
* @param hostName host name where task attempt executed
|
|
|
*/
|
|
|
- public static void logFinished(String jobId, String taskId, String taskAttemptId,
|
|
|
- long shuffleFinished, long sortFinished, long finishTime, String hostName){
|
|
|
- if( ! disableHistory ){
|
|
|
- PrintWriter writer = (PrintWriter)openJobs.get(JOBTRACKER_START_TIME + "_" + jobId);
|
|
|
- if( null != writer ){
|
|
|
- JobHistory.log( writer, RecordTypes.ReduceAttempt,
|
|
|
- new Enum[]{ Keys.TASK_TYPE, Keys.TASKID, Keys.TASK_ATTEMPT_ID, Keys.TASK_STATUS,
|
|
|
- Keys.SHUFFLE_FINISHED, Keys.SORT_FINISHED, Keys.FINISH_TIME, Keys.HOSTNAME},
|
|
|
- new String[]{Values.REDUCE.name(), taskId, taskAttemptId, Values.SUCCESS.name(),
|
|
|
- String.valueOf(shuffleFinished), String.valueOf(sortFinished),
|
|
|
- String.valueOf(finishTime), hostName} ) ;
|
|
|
- }
|
|
|
- }
|
|
|
- }
|
|
|
- /**
|
|
|
- * Log failed reduce task attempt.
|
|
|
- * @param jobId job id
|
|
|
- * @param taskId task id
|
|
|
- * @param taskAttemptId task attempt id
|
|
|
- * @param timestamp time stamp when task failed
|
|
|
- * @param hostName host name of the task attempt.
|
|
|
- * @param error error message of the task.
|
|
|
- */
|
|
|
- public static void logFailed(String jobId, String taskId,String taskAttemptId, long timestamp,
|
|
|
- String hostName, String error){
|
|
|
- if( ! disableHistory ){
|
|
|
- PrintWriter writer = (PrintWriter)openJobs.get(JOBTRACKER_START_TIME + "_" + jobId);
|
|
|
- if( null != writer ){
|
|
|
- JobHistory.log( writer, RecordTypes.ReduceAttempt,
|
|
|
- new Enum[]{ Keys.TASK_TYPE, Keys.TASKID, Keys.TASK_ATTEMPT_ID,Keys.TASK_STATUS,
|
|
|
- Keys.FINISH_TIME, Keys.HOSTNAME, Keys.ERROR },
|
|
|
- new String[]{ Values.REDUCE.name(), taskId, taskAttemptId, Values.FAILED.name(),
|
|
|
- String.valueOf(timestamp), hostName, error } ) ;
|
|
|
- }
|
|
|
- }
|
|
|
- }
|
|
|
+ public static void logFinished(String jobId, String taskId, String taskAttemptId,
|
|
|
+ long shuffleFinished, long sortFinished, long finishTime, String hostName){
|
|
|
+ if( ! disableHistory ){
|
|
|
+ PrintWriter writer = (PrintWriter)openJobs.get(JOBTRACKER_START_TIME + "_" + jobId);
|
|
|
+ if( null != writer ){
|
|
|
+ JobHistory.log( writer, RecordTypes.ReduceAttempt,
|
|
|
+ new Enum[]{ Keys.TASK_TYPE, Keys.TASKID, Keys.TASK_ATTEMPT_ID, Keys.TASK_STATUS,
|
|
|
+ Keys.SHUFFLE_FINISHED, Keys.SORT_FINISHED, Keys.FINISH_TIME, Keys.HOSTNAME},
|
|
|
+ new String[]{Values.REDUCE.name(), taskId, taskAttemptId, Values.SUCCESS.name(),
|
|
|
+ String.valueOf(shuffleFinished), String.valueOf(sortFinished),
|
|
|
+ String.valueOf(finishTime), hostName} ) ;
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+ /**
|
|
|
+ * Log failed reduce task attempt.
|
|
|
+ * @param jobId job id
|
|
|
+ * @param taskId task id
|
|
|
+ * @param taskAttemptId task attempt id
|
|
|
+ * @param timestamp time stamp when task failed
|
|
|
+ * @param hostName host name of the task attempt.
|
|
|
+ * @param error error message of the task.
|
|
|
+ */
|
|
|
+ public static void logFailed(String jobId, String taskId,String taskAttemptId, long timestamp,
|
|
|
+ String hostName, String error){
|
|
|
+ if( ! disableHistory ){
|
|
|
+ PrintWriter writer = (PrintWriter)openJobs.get(JOBTRACKER_START_TIME + "_" + jobId);
|
|
|
+ if( null != writer ){
|
|
|
+ JobHistory.log( writer, RecordTypes.ReduceAttempt,
|
|
|
+ new Enum[]{ Keys.TASK_TYPE, Keys.TASKID, Keys.TASK_ATTEMPT_ID,Keys.TASK_STATUS,
|
|
|
+ Keys.FINISH_TIME, Keys.HOSTNAME, Keys.ERROR },
|
|
|
+ new String[]{ Values.REDUCE.name(), taskId, taskAttemptId, Values.FAILED.name(),
|
|
|
+ String.valueOf(timestamp), hostName, error } ) ;
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
}
|
|
|
/**
|
|
|
* Callback interface for reading back log events from JobHistory. This interface
|
|
@@ -677,12 +677,12 @@ public class JobHistory {
|
|
|
if( lastRan ==0 || (now - lastRan) < ONE_DAY_IN_MS ){
|
|
|
return ;
|
|
|
}
|
|
|
- lastRan = now;
|
|
|
- isRunning = true ;
|
|
|
- // update master Index first
|
|
|
- try{
|
|
|
+ lastRan = now;
|
|
|
+ isRunning = true ;
|
|
|
+ // update master Index first
|
|
|
+ try{
|
|
|
File logFile = new File(
|
|
|
- LOG_DIR + File.separator + MASTER_INDEX_LOG_FILE);
|
|
|
+ LOG_DIR + File.separator + MASTER_INDEX_LOG_FILE);
|
|
|
|
|
|
synchronized(MASTER_INDEX_LOG_FILE){
|
|
|
Map<String, Map<String, JobHistory.JobInfo>> jobTrackersToJobs =
|
|
@@ -728,14 +728,14 @@ public class JobHistory {
|
|
|
}
|
|
|
|
|
|
File[] oldFiles = new File(LOG_DIR).listFiles(new FileFilter(){
|
|
|
- public boolean accept(File file){
|
|
|
- // delete if older than 30 days
|
|
|
- if( now - file.lastModified() > THIRTY_DAYS_IN_MS ){
|
|
|
- return true ;
|
|
|
- }
|
|
|
+ public boolean accept(File file){
|
|
|
+ // delete if older than 30 days
|
|
|
+ if( now - file.lastModified() > THIRTY_DAYS_IN_MS ){
|
|
|
+ return true ;
|
|
|
+ }
|
|
|
return false;
|
|
|
- }
|
|
|
- });
|
|
|
+ }
|
|
|
+ });
|
|
|
for( File f : oldFiles){
|
|
|
f.delete();
|
|
|
LOG.info("Deleting old history file : " + f.getName());
|