|
@@ -24,7 +24,9 @@ import java.io.FileFilter;
|
|
|
import java.io.FileOutputStream;
|
|
|
import java.io.FileReader;
|
|
|
import java.io.IOException;
|
|
|
+import java.io.InputStreamReader;
|
|
|
import java.io.PrintWriter;
|
|
|
+import java.util.ArrayList;
|
|
|
import java.util.HashMap;
|
|
|
import java.util.Iterator;
|
|
|
import java.util.Map;
|
|
@@ -34,7 +36,12 @@ import java.util.regex.Pattern;
|
|
|
|
|
|
import org.apache.commons.logging.Log;
|
|
|
import org.apache.commons.logging.LogFactory;
|
|
|
+import org.apache.hadoop.fs.FSDataInputStream;
|
|
|
+import org.apache.hadoop.fs.FSDataOutputStream;
|
|
|
+import org.apache.hadoop.fs.FileSystem;
|
|
|
+import org.apache.hadoop.fs.Path;
|
|
|
import org.apache.hadoop.util.StringUtils;
|
|
|
+
|
|
|
/**
|
|
|
* Provides methods for writing to and reading from job history.
|
|
|
* Job History works in an append mode, JobHistory and its inner classes provide methods
|
|
@@ -60,13 +67,14 @@ public class JobHistory {
|
|
|
private static final String VALUE = "[[^\"]?]+"; // anything but a " in ""
|
|
|
|
|
|
private static final Pattern pattern = Pattern.compile(KEY + "=" + "\"" + VALUE + "\"");
|
|
|
+ private static final int MAX_FILENAME_SIZE = 255;
|
|
|
|
|
|
- public static final String JOBTRACKER_START_TIME = String.valueOf(System.currentTimeMillis());
|
|
|
- private static final String LOG_DIR = System.getProperty("hadoop.log.dir") + File.separator + "history";
|
|
|
- public static final String MASTER_INDEX_LOG_FILE = "JobHistory.log";
|
|
|
-
|
|
|
- private static PrintWriter masterIndex = null;
|
|
|
- private static Map<String, PrintWriter> openJobs = new HashMap<String, PrintWriter>();
|
|
|
+ public static final String JOBTRACKER_START_TIME =
|
|
|
+ String.valueOf(System.currentTimeMillis());
|
|
|
+ private static String JOBTRACKER_UNIQUE_STRING = null;
|
|
|
+ private static String LOG_DIR = null;
|
|
|
+ private static Map<String, ArrayList<PrintWriter>> openJobs =
|
|
|
+ new HashMap<String, ArrayList<PrintWriter>>();
|
|
|
private static boolean disableHistory = false;
|
|
|
/**
|
|
|
* Record types are identifiers for each line of log in history files.
|
|
@@ -101,27 +109,23 @@ public class JobHistory {
|
|
|
// temp buffer for parsed dataa
|
|
|
private static Map<Keys,String> parseBuffer = new HashMap<Keys, String>();
|
|
|
|
|
|
- // init log files
|
|
|
- static { init(); }
|
|
|
-
|
|
|
/**
|
|
|
* Initialize JobHistory files.
|
|
|
*
|
|
|
*/
|
|
|
- private static void init(){
|
|
|
+ public static void init(JobConf conf, String hostname){
|
|
|
if (!disableHistory){
|
|
|
try{
|
|
|
- File logDir = new File(LOG_DIR);
|
|
|
- if (!logDir.exists()){
|
|
|
- if (!logDir.mkdirs()){
|
|
|
+ LOG_DIR = conf.get("hadoop.job.history.location");
|
|
|
+ JOBTRACKER_UNIQUE_STRING = hostname + "_" +
|
|
|
+ JOBTRACKER_START_TIME + "_";
|
|
|
+ Path logDir = new Path(LOG_DIR);
|
|
|
+ FileSystem fs = logDir.getFileSystem(conf);
|
|
|
+ if (!fs.exists(logDir)){
|
|
|
+ if (!fs.mkdirs(logDir)){
|
|
|
throw new IOException("Mkdirs failed to create " + logDir.toString());
|
|
|
}
|
|
|
}
|
|
|
- masterIndex =
|
|
|
- new PrintWriter(
|
|
|
- new FileOutputStream(new File(LOG_DIR + File.separator + MASTER_INDEX_LOG_FILE), true));
|
|
|
- // add jobtracker id = tracker start time
|
|
|
- log(masterIndex, RecordTypes.Jobtracker, Keys.START_TIME, JOBTRACKER_START_TIME);
|
|
|
}catch(IOException e){
|
|
|
LOG.error("Failed to initialize JobHistory log file", e);
|
|
|
disableHistory = true;
|
|
@@ -129,17 +133,19 @@ public class JobHistory {
|
|
|
}
|
|
|
}
|
|
|
|
|
|
-
|
|
|
/**
|
|
|
- * Parses history file and invokes Listener.handle() for each line of history. It can
|
|
|
- * be used for looking through history files for specific items without having to keep
|
|
|
- * whlole history in memory.
|
|
|
+ * Parses history file and invokes Listener.handle() for
|
|
|
+ * each line of history. It can be used for looking through history
|
|
|
+ * files for specific items without having to keep whole history in memory.
|
|
|
* @param path path to history file
|
|
|
* @param l Listener for history events
|
|
|
+ * @param fs FileSystem where history file is present
|
|
|
* @throws IOException
|
|
|
*/
|
|
|
- public static void parseHistory(File path, Listener l) throws IOException{
|
|
|
- BufferedReader reader = new BufferedReader(new FileReader(path));
|
|
|
+ public static void parseHistoryFromFS(String path, Listener l, FileSystem fs)
|
|
|
+ throws IOException{
|
|
|
+ FSDataInputStream in = fs.open(new Path(path));
|
|
|
+ BufferedReader reader = new BufferedReader(new InputStreamReader (in));
|
|
|
try {
|
|
|
String line = null;
|
|
|
StringBuffer buf = new StringBuffer();
|
|
@@ -155,6 +161,7 @@ public class JobHistory {
|
|
|
try { reader.close(); } catch (IOException ex) {}
|
|
|
}
|
|
|
}
|
|
|
+
|
|
|
/**
|
|
|
* Parse a single line of history.
|
|
|
* @param line
|
|
@@ -203,8 +210,8 @@ public class JobHistory {
|
|
|
* @param values type of log event
|
|
|
*/
|
|
|
|
|
|
- static void log(PrintWriter out, RecordTypes recordType, Keys[] keys,
|
|
|
- String[] values){
|
|
|
+ static void log(ArrayList<PrintWriter> writers, RecordTypes recordType,
|
|
|
+ Keys[] keys, String[] values) {
|
|
|
StringBuffer buf = new StringBuffer(recordType.name());
|
|
|
buf.append(DELIMITER);
|
|
|
for(int i =0; i< keys.length; i++){
|
|
@@ -215,8 +222,10 @@ public class JobHistory {
|
|
|
buf.append(DELIMITER);
|
|
|
}
|
|
|
|
|
|
- out.println(buf.toString());
|
|
|
- out.flush();
|
|
|
+ for (PrintWriter out : writers) {
|
|
|
+ out.println(buf.toString());
|
|
|
+ out.flush();
|
|
|
+ }
|
|
|
}
|
|
|
|
|
|
/**
|
|
@@ -331,7 +340,8 @@ public class JobHistory {
|
|
|
* @return the path of the job file on the local file system
|
|
|
*/
|
|
|
public static String getLocalJobFilePath(String jobId){
|
|
|
- return LOG_DIR + File.separator + jobId + "_conf.xml";
|
|
|
+ return System.getProperty("hadoop.log.dir") + File.separator +
|
|
|
+ jobId + "_conf.xml";
|
|
|
}
|
|
|
|
|
|
/**
|
|
@@ -347,23 +357,63 @@ public class JobHistory {
|
|
|
String jobConfPath, long submitTime) {
|
|
|
String jobName = jobConf.getJobName();
|
|
|
String user = jobConf.getUser();
|
|
|
+ FileSystem fs = null;
|
|
|
+ String userLogDir = null;
|
|
|
+ String jobUniqueString = JOBTRACKER_UNIQUE_STRING + jobId;
|
|
|
+
|
|
|
if (!disableHistory){
|
|
|
- synchronized(MASTER_INDEX_LOG_FILE){
|
|
|
- JobHistory.log(masterIndex, RecordTypes.Job,
|
|
|
- new Keys[]{Keys.JOBID, Keys.JOBNAME, Keys.USER, Keys.SUBMIT_TIME, Keys.JOBCONF },
|
|
|
- new String[]{jobId, jobName, user,
|
|
|
- String.valueOf(submitTime), jobConfPath}
|
|
|
- );
|
|
|
- }
|
|
|
// setup the history log file for this job
|
|
|
- String logFileName = JOBTRACKER_START_TIME + "_" + jobId;
|
|
|
- File logFile = new File(LOG_DIR + File.separator + logFileName);
|
|
|
-
|
|
|
+ String logFileName = jobUniqueString +
|
|
|
+ "_" + user+ "_" + jobName;
|
|
|
+ if (logFileName.length() > MAX_FILENAME_SIZE) {
|
|
|
+ logFileName = logFileName.substring(0, MAX_FILENAME_SIZE-1);
|
|
|
+ }
|
|
|
+
|
|
|
+ // find user log directory
|
|
|
+ Path outputPath = jobConf.getOutputPath();
|
|
|
+ userLogDir = jobConf.get("hadoop.job.history.user.location",
|
|
|
+ outputPath == null ? null : outputPath.toString());
|
|
|
+ if ("none".equals(userLogDir)) {
|
|
|
+ userLogDir = null;
|
|
|
+ }
|
|
|
+ if (userLogDir != null) {
|
|
|
+ userLogDir = userLogDir + "/_logs/history";
|
|
|
+ }
|
|
|
+
|
|
|
+ String logFile = null;
|
|
|
+ String userLogFile = null;
|
|
|
+ if (LOG_DIR != null ) {
|
|
|
+ logFile = LOG_DIR + File.separator + logFileName;
|
|
|
+ }
|
|
|
+ if (userLogDir != null ) {
|
|
|
+ userLogFile = userLogDir + File.separator + logFileName;
|
|
|
+ }
|
|
|
+
|
|
|
try{
|
|
|
- PrintWriter writer = new PrintWriter(logFile);
|
|
|
- openJobs.put(logFileName, writer);
|
|
|
- // add to writer as well
|
|
|
- JobHistory.log(writer, RecordTypes.Job,
|
|
|
+ ArrayList<PrintWriter> writers = new ArrayList<PrintWriter>();
|
|
|
+ FSDataOutputStream out = null;
|
|
|
+ PrintWriter writer = null;
|
|
|
+
|
|
|
+ if (LOG_DIR != null) {
|
|
|
+ // create output stream for logging in hadoop.job.history.location
|
|
|
+ fs = new Path(LOG_DIR).getFileSystem(jobConf);
|
|
|
+ out = fs.create(new Path(logFile), true, 4096);
|
|
|
+ writer = new PrintWriter(out);
|
|
|
+ writers.add(writer);
|
|
|
+ }
|
|
|
+ if (userLogDir != null) {
|
|
|
+ // create output stream for logging
|
|
|
+ // in hadoop.job.history.user.location
|
|
|
+ fs = new Path(userLogDir).getFileSystem(jobConf);
|
|
|
+ out = fs.create(new Path(userLogFile), true, 4096);
|
|
|
+ writer = new PrintWriter(out);
|
|
|
+ writers.add(writer);
|
|
|
+ }
|
|
|
+
|
|
|
+ openJobs.put(jobUniqueString, writers);
|
|
|
+
|
|
|
+ //add to writer as well
|
|
|
+ JobHistory.log(writers, RecordTypes.Job,
|
|
|
new Keys[]{Keys.JOBID, Keys.JOBNAME, Keys.USER, Keys.SUBMIT_TIME, Keys.JOBCONF },
|
|
|
new String[]{jobId, jobName, user,
|
|
|
String.valueOf(submitTime) , jobConfPath}
|
|
@@ -374,7 +424,7 @@ public class JobHistory {
|
|
|
disableHistory = true;
|
|
|
}
|
|
|
}
|
|
|
- /* Storing the job conf on the local file system */
|
|
|
+ // Always store job conf on local file system
|
|
|
String localJobFilePath = JobInfo.getLocalJobFilePath(jobId);
|
|
|
File localJobFile = new File(localJobFilePath);
|
|
|
FileOutputStream jobOut = null;
|
|
@@ -393,10 +443,53 @@ public class JobHistory {
|
|
|
jobOut.close();
|
|
|
} catch (IOException ie) {
|
|
|
LOG.info("Failed to close the job configuration file "
|
|
|
- + StringUtils.stringifyException(ie));
|
|
|
+ + StringUtils.stringifyException(ie));
|
|
|
}
|
|
|
}
|
|
|
}
|
|
|
+
|
|
|
+ /* Storing the job conf on the log dir */
|
|
|
+ Path jobFilePath = null;
|
|
|
+ if (LOG_DIR != null) {
|
|
|
+ jobFilePath = new Path(LOG_DIR + File.separator +
|
|
|
+ jobUniqueString + "_conf.xml");
|
|
|
+ }
|
|
|
+ Path userJobFilePath = null;
|
|
|
+ if (userLogDir != null) {
|
|
|
+ userJobFilePath = new Path(userLogDir + File.separator +
|
|
|
+ jobUniqueString + "_conf.xml");
|
|
|
+ }
|
|
|
+ FSDataOutputStream jobFileOut = null;
|
|
|
+ try {
|
|
|
+ if (LOG_DIR != null) {
|
|
|
+ fs = new Path(LOG_DIR).getFileSystem(jobConf);
|
|
|
+ if (!fs.exists(jobFilePath)) {
|
|
|
+ jobFileOut = fs.create(jobFilePath);
|
|
|
+ jobConf.write(jobFileOut);
|
|
|
+ jobFileOut.close();
|
|
|
+ }
|
|
|
+ }
|
|
|
+ if (userLogDir != null) {
|
|
|
+ fs = new Path(userLogDir).getFileSystem(jobConf);
|
|
|
+ jobFileOut = fs.create(userJobFilePath);
|
|
|
+ jobConf.write(jobFileOut);
|
|
|
+ }
|
|
|
+ if (LOG.isDebugEnabled()) {
|
|
|
+ LOG.debug("Job conf for " + jobId + " stored at "
|
|
|
+ + jobFilePath + "and" + userJobFilePath );
|
|
|
+ }
|
|
|
+ } catch (IOException ioe) {
|
|
|
+ LOG.error("Failed to store job conf on the local filesystem ", ioe);
|
|
|
+ } finally {
|
|
|
+ if (jobFileOut != null) {
|
|
|
+ try {
|
|
|
+ jobFileOut.close();
|
|
|
+ } catch (IOException ie) {
|
|
|
+ LOG.info("Failed to close the job configuration file "
|
|
|
+ + StringUtils.stringifyException(ie));
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
}
|
|
|
/**
|
|
|
* Logs launch time of job.
|
|
@@ -407,16 +500,9 @@ public class JobHistory {
|
|
|
*/
|
|
|
public static void logStarted(String jobId, long startTime, int totalMaps, int totalReduces){
|
|
|
if (!disableHistory){
|
|
|
- synchronized(MASTER_INDEX_LOG_FILE){
|
|
|
- JobHistory.log(masterIndex, RecordTypes.Job,
|
|
|
- new Keys[] {Keys.JOBID, Keys.LAUNCH_TIME, Keys.TOTAL_MAPS, Keys.TOTAL_REDUCES },
|
|
|
- new String[] {jobId, String.valueOf(startTime),
|
|
|
- String.valueOf(totalMaps), String.valueOf(totalReduces) });
|
|
|
- }
|
|
|
-
|
|
|
- String logFileName = JOBTRACKER_START_TIME + "_" + jobId;
|
|
|
- PrintWriter writer = openJobs.get(logFileName);
|
|
|
-
|
|
|
+ String logFileKey = JOBTRACKER_UNIQUE_STRING + jobId;
|
|
|
+ ArrayList<PrintWriter> writer = openJobs.get(logFileKey);
|
|
|
+
|
|
|
if (null != writer){
|
|
|
JobHistory.log(writer, RecordTypes.Job,
|
|
|
new Keys[] {Keys.JOBID, Keys.LAUNCH_TIME, Keys.TOTAL_MAPS, Keys.TOTAL_REDUCES },
|
|
@@ -439,20 +525,10 @@ public class JobHistory {
|
|
|
int failedMaps, int failedReduces,
|
|
|
Counters counters){
|
|
|
if (!disableHistory){
|
|
|
- synchronized(MASTER_INDEX_LOG_FILE){
|
|
|
- JobHistory.log(masterIndex, RecordTypes.Job,
|
|
|
- new Keys[] {Keys.JOBID, Keys.FINISH_TIME,
|
|
|
- Keys.JOB_STATUS, Keys.FINISHED_MAPS,
|
|
|
- Keys.FINISHED_REDUCES},
|
|
|
- new String[] {jobId, "" + finishTime,
|
|
|
- Values.SUCCESS.name(),
|
|
|
- String.valueOf(finishedMaps),
|
|
|
- String.valueOf(finishedReduces)});
|
|
|
- }
|
|
|
-
|
|
|
// close job file for this job
|
|
|
- String logFileName = JOBTRACKER_START_TIME + "_" + jobId;
|
|
|
- PrintWriter writer = openJobs.get(logFileName);
|
|
|
+ String logFileKey = JOBTRACKER_UNIQUE_STRING + jobId;
|
|
|
+ ArrayList<PrintWriter> writer = openJobs.get(logFileKey);
|
|
|
+
|
|
|
if (null != writer){
|
|
|
JobHistory.log(writer, RecordTypes.Job,
|
|
|
new Keys[] {Keys.JOBID, Keys.FINISH_TIME,
|
|
@@ -467,8 +543,10 @@ public class JobHistory {
|
|
|
String.valueOf(failedMaps),
|
|
|
String.valueOf(failedReduces),
|
|
|
counters.makeCompactString()});
|
|
|
- writer.close();
|
|
|
- openJobs.remove(logFileName);
|
|
|
+ for (PrintWriter out : writer) {
|
|
|
+ out.close();
|
|
|
+ }
|
|
|
+ openJobs.remove(logFileKey);
|
|
|
}
|
|
|
Thread historyCleaner = new Thread(new HistoryCleaner());
|
|
|
historyCleaner.start();
|
|
@@ -483,21 +561,18 @@ public class JobHistory {
|
|
|
*/
|
|
|
public static void logFailed(String jobid, long timestamp, int finishedMaps, int finishedReduces){
|
|
|
if (!disableHistory){
|
|
|
- synchronized(MASTER_INDEX_LOG_FILE){
|
|
|
- JobHistory.log(masterIndex, RecordTypes.Job,
|
|
|
- new Keys[] {Keys.JOBID, Keys.FINISH_TIME, Keys.JOB_STATUS, Keys.FINISHED_MAPS, Keys.FINISHED_REDUCES },
|
|
|
- new String[] {jobid, String.valueOf(timestamp), Values.FAILED.name(), String.valueOf(finishedMaps),
|
|
|
- String.valueOf(finishedReduces)});
|
|
|
- }
|
|
|
- String logFileName = JOBTRACKER_START_TIME + "_" + jobid;
|
|
|
- PrintWriter writer = openJobs.get(logFileName);
|
|
|
+ String logFileKey = JOBTRACKER_UNIQUE_STRING + jobid;
|
|
|
+ ArrayList<PrintWriter> writer = openJobs.get(logFileKey);
|
|
|
+
|
|
|
if (null != writer){
|
|
|
JobHistory.log(writer, RecordTypes.Job,
|
|
|
new Keys[] {Keys.JOBID, Keys.FINISH_TIME, Keys.JOB_STATUS, Keys.FINISHED_MAPS, Keys.FINISHED_REDUCES },
|
|
|
new String[] {jobid, String.valueOf(timestamp), Values.FAILED.name(), String.valueOf(finishedMaps),
|
|
|
String.valueOf(finishedReduces)});
|
|
|
- writer.close();
|
|
|
- openJobs.remove(logFileName);
|
|
|
+ for (PrintWriter out : writer) {
|
|
|
+ out.close();
|
|
|
+ }
|
|
|
+ openJobs.remove(logFileKey);
|
|
|
}
|
|
|
}
|
|
|
}
|
|
@@ -520,7 +595,9 @@ public class JobHistory {
|
|
|
public static void logStarted(String jobId, String taskId, String taskType,
|
|
|
long startTime){
|
|
|
if (!disableHistory){
|
|
|
- PrintWriter writer = openJobs.get(JOBTRACKER_START_TIME + "_" + jobId);
|
|
|
+ ArrayList<PrintWriter> writer = openJobs.get(JOBTRACKER_UNIQUE_STRING
|
|
|
+ + jobId);
|
|
|
+
|
|
|
if (null != writer){
|
|
|
JobHistory.log(writer, RecordTypes.Task,
|
|
|
new Keys[]{Keys.TASKID, Keys.TASK_TYPE , Keys.START_TIME},
|
|
@@ -538,7 +615,9 @@ public class JobHistory {
|
|
|
public static void logFinished(String jobId, String taskId, String taskType,
|
|
|
long finishTime, Counters counters){
|
|
|
if (!disableHistory){
|
|
|
- PrintWriter writer = openJobs.get(JOBTRACKER_START_TIME + "_" + jobId);
|
|
|
+ ArrayList<PrintWriter> writer = openJobs.get(JOBTRACKER_UNIQUE_STRING
|
|
|
+ + jobId);
|
|
|
+
|
|
|
if (null != writer){
|
|
|
JobHistory.log(writer, RecordTypes.Task,
|
|
|
new Keys[]{Keys.TASKID, Keys.TASK_TYPE,
|
|
@@ -560,7 +639,9 @@ public class JobHistory {
|
|
|
*/
|
|
|
public static void logFailed(String jobId, String taskId, String taskType, long time, String error){
|
|
|
if (!disableHistory){
|
|
|
- PrintWriter writer = openJobs.get(JOBTRACKER_START_TIME + "_" + jobId);
|
|
|
+ ArrayList<PrintWriter> writer = openJobs.get(JOBTRACKER_UNIQUE_STRING
|
|
|
+ + jobId);
|
|
|
+
|
|
|
if (null != writer){
|
|
|
JobHistory.log(writer, RecordTypes.Task,
|
|
|
new Keys[]{Keys.TASKID, Keys.TASK_TYPE,
|
|
@@ -597,7 +678,9 @@ public class JobHistory {
|
|
|
*/
|
|
|
public static void logStarted(String jobId, String taskId, String taskAttemptId, long startTime, String hostName){
|
|
|
if (!disableHistory){
|
|
|
- PrintWriter writer = openJobs.get(JOBTRACKER_START_TIME + "_" + jobId);
|
|
|
+ ArrayList<PrintWriter> writer = openJobs.get(JOBTRACKER_UNIQUE_STRING
|
|
|
+ + jobId);
|
|
|
+
|
|
|
if (null != writer){
|
|
|
JobHistory.log(writer, RecordTypes.MapAttempt,
|
|
|
new Keys[]{ Keys.TASK_TYPE, Keys.TASKID,
|
|
@@ -620,7 +703,9 @@ public class JobHistory {
|
|
|
String taskAttemptId, long finishTime,
|
|
|
String hostName){
|
|
|
if (!disableHistory){
|
|
|
- PrintWriter writer = openJobs.get(JOBTRACKER_START_TIME + "_" + jobId);
|
|
|
+ ArrayList<PrintWriter> writer = openJobs.get(JOBTRACKER_UNIQUE_STRING
|
|
|
+ + jobId);
|
|
|
+
|
|
|
if (null != writer){
|
|
|
JobHistory.log(writer, RecordTypes.MapAttempt,
|
|
|
new Keys[]{ Keys.TASK_TYPE, Keys.TASKID,
|
|
@@ -644,7 +729,9 @@ public class JobHistory {
|
|
|
public static void logFailed(String jobId, String taskId, String taskAttemptId,
|
|
|
long timestamp, String hostName, String error){
|
|
|
if (!disableHistory){
|
|
|
- PrintWriter writer = openJobs.get(JOBTRACKER_START_TIME + "_" + jobId);
|
|
|
+ ArrayList<PrintWriter> writer = openJobs.get(JOBTRACKER_UNIQUE_STRING
|
|
|
+ + jobId);
|
|
|
+
|
|
|
if (null != writer){
|
|
|
JobHistory.log(writer, RecordTypes.MapAttempt,
|
|
|
new Keys[]{Keys.TASK_TYPE, Keys.TASKID, Keys.TASK_ATTEMPT_ID, Keys.TASK_STATUS,
|
|
@@ -666,7 +753,9 @@ public class JobHistory {
|
|
|
public static void logKilled(String jobId, String taskId, String taskAttemptId,
|
|
|
long timestamp, String hostName, String error){
|
|
|
if (!disableHistory){
|
|
|
- PrintWriter writer = openJobs.get(JOBTRACKER_START_TIME + "_" + jobId);
|
|
|
+ ArrayList<PrintWriter> writer = openJobs.get(JOBTRACKER_UNIQUE_STRING
|
|
|
+ + jobId);
|
|
|
+
|
|
|
if (null != writer){
|
|
|
JobHistory.log(writer, RecordTypes.MapAttempt,
|
|
|
new Keys[]{Keys.TASK_TYPE, Keys.TASKID, Keys.TASK_ATTEMPT_ID, Keys.TASK_STATUS,
|
|
@@ -693,7 +782,9 @@ public class JobHistory {
|
|
|
public static void logStarted(String jobId, String taskId, String taskAttemptId,
|
|
|
long startTime, String hostName){
|
|
|
if (!disableHistory){
|
|
|
- PrintWriter writer = openJobs.get(JOBTRACKER_START_TIME + "_" + jobId);
|
|
|
+ ArrayList<PrintWriter> writer = openJobs.get(JOBTRACKER_UNIQUE_STRING
|
|
|
+ + jobId);
|
|
|
+
|
|
|
if (null != writer){
|
|
|
JobHistory.log(writer, RecordTypes.ReduceAttempt,
|
|
|
new Keys[]{ Keys.TASK_TYPE, Keys.TASKID,
|
|
@@ -718,7 +809,9 @@ public class JobHistory {
|
|
|
long sortFinished, long finishTime,
|
|
|
String hostName){
|
|
|
if (!disableHistory){
|
|
|
- PrintWriter writer = openJobs.get(JOBTRACKER_START_TIME + "_" + jobId);
|
|
|
+ ArrayList<PrintWriter> writer = openJobs.get(JOBTRACKER_UNIQUE_STRING
|
|
|
+ + jobId);
|
|
|
+
|
|
|
if (null != writer){
|
|
|
JobHistory.log(writer, RecordTypes.ReduceAttempt,
|
|
|
new Keys[]{ Keys.TASK_TYPE, Keys.TASKID,
|
|
@@ -743,7 +836,9 @@ public class JobHistory {
|
|
|
public static void logFailed(String jobId, String taskId, String taskAttemptId, long timestamp,
|
|
|
String hostName, String error){
|
|
|
if (!disableHistory){
|
|
|
- PrintWriter writer = openJobs.get(JOBTRACKER_START_TIME + "_" + jobId);
|
|
|
+ ArrayList<PrintWriter> writer = openJobs.get(JOBTRACKER_UNIQUE_STRING
|
|
|
+ + jobId);
|
|
|
+
|
|
|
if (null != writer){
|
|
|
JobHistory.log(writer, RecordTypes.ReduceAttempt,
|
|
|
new Keys[]{ Keys.TASK_TYPE, Keys.TASKID, Keys.TASK_ATTEMPT_ID, Keys.TASK_STATUS,
|
|
@@ -765,7 +860,9 @@ public class JobHistory {
|
|
|
public static void logKilled(String jobId, String taskId, String taskAttemptId, long timestamp,
|
|
|
String hostName, String error){
|
|
|
if (!disableHistory){
|
|
|
- PrintWriter writer = openJobs.get(JOBTRACKER_START_TIME + "_" + jobId);
|
|
|
+ ArrayList<PrintWriter> writer = openJobs.get(JOBTRACKER_UNIQUE_STRING
|
|
|
+ + jobId);
|
|
|
+
|
|
|
if (null != writer){
|
|
|
JobHistory.log(writer, RecordTypes.ReduceAttempt,
|
|
|
new Keys[]{ Keys.TASK_TYPE, Keys.TASKID,
|
|
@@ -822,61 +919,6 @@ public class JobHistory {
|
|
|
}
|
|
|
lastRan = now;
|
|
|
isRunning = true;
|
|
|
- // update master Index first
|
|
|
- try{
|
|
|
- File logFile = new File(
|
|
|
- LOG_DIR + File.separator + MASTER_INDEX_LOG_FILE);
|
|
|
-
|
|
|
- synchronized(MASTER_INDEX_LOG_FILE){
|
|
|
- Map<String, Map<String, JobHistory.JobInfo>> jobTrackersToJobs =
|
|
|
- DefaultJobHistoryParser.parseMasterIndex(logFile);
|
|
|
-
|
|
|
- // find job that started more than one month back and remove them
|
|
|
- // for jobtracker instances which dont have a job in past one month
|
|
|
- // remove the jobtracker start timestamp as well.
|
|
|
- Iterator<Map<String, JobHistory.JobInfo>> jobTrackerItr =
|
|
|
- jobTrackersToJobs.values().iterator();
|
|
|
- while (jobTrackerItr.hasNext()) {
|
|
|
- Map<String, JobHistory.JobInfo> jobs = jobTrackerItr.next();
|
|
|
- Iterator<Map.Entry<String, JobHistory.JobInfo>> jobItr =
|
|
|
- jobs.entrySet().iterator();
|
|
|
- while (jobItr.hasNext()) {
|
|
|
- Map.Entry<String, JobHistory.JobInfo> item = jobItr.next();
|
|
|
- if (now - item.getValue().getLong(Keys.SUBMIT_TIME) >
|
|
|
- THIRTY_DAYS_IN_MS) {
|
|
|
- jobItr.remove();
|
|
|
- }
|
|
|
- }
|
|
|
- if (jobs.size() == 0){
|
|
|
- jobTrackerItr.remove();
|
|
|
- }
|
|
|
- }
|
|
|
- masterIndex.close();
|
|
|
- masterIndex = new PrintWriter(logFile);
|
|
|
- // delete old history and write back to a new file
|
|
|
- for (Map.Entry<String, Map<String, JobHistory.JobInfo>> entry :
|
|
|
- jobTrackersToJobs.entrySet()) {
|
|
|
- String jobTrackerId = entry.getKey();
|
|
|
- Map<String, JobHistory.JobInfo> jobs = entry.getValue();
|
|
|
-
|
|
|
-
|
|
|
- log(masterIndex, RecordTypes.Jobtracker, Keys.START_TIME, jobTrackerId);
|
|
|
-
|
|
|
- for(JobHistory.JobInfo job : jobs.values()){
|
|
|
- Map<Keys, String> values = job.getValues();
|
|
|
-
|
|
|
- log(masterIndex, RecordTypes.Job,
|
|
|
- values.keySet().toArray(new Keys[0]),
|
|
|
- values.values().toArray(new String[0]));
|
|
|
-
|
|
|
- }
|
|
|
- masterIndex.flush();
|
|
|
- }
|
|
|
- }
|
|
|
- }catch(IOException e){
|
|
|
- LOG.error("Failed loading history log for cleanup", e);
|
|
|
- }
|
|
|
-
|
|
|
File[] oldFiles = new File(LOG_DIR).listFiles(new FileFilter(){
|
|
|
public boolean accept(File file){
|
|
|
// delete if older than 30 days
|