|
@@ -92,17 +92,79 @@ public class JobHistory {
|
|
|
public static final int JOB_NAME_TRIM_LENGTH = 50;
|
|
|
private static String JOBTRACKER_UNIQUE_STRING = null;
|
|
|
private static String LOG_DIR = null;
|
|
|
- private static Map<String, ArrayList<PrintWriter>> openJobs =
|
|
|
- new ConcurrentHashMap<String, ArrayList<PrintWriter>>();
|
|
|
private static boolean disableHistory = false;
|
|
|
private static final String SECONDARY_FILE_SUFFIX = ".recover";
|
|
|
private static long jobHistoryBlockSize = 0;
|
|
|
private static String jobtrackerHostname;
|
|
|
+ private static JobHistoryFilesManager fileManager =
|
|
|
+ new JobHistoryFilesManager();
|
|
|
final static FsPermission HISTORY_DIR_PERMISSION =
|
|
|
FsPermission.createImmutable((short) 0750); // rwxr-x---
|
|
|
final static FsPermission HISTORY_FILE_PERMISSION =
|
|
|
FsPermission.createImmutable((short) 0740); // rwxr-----
|
|
|
private static JobConf jtConf;
|
|
|
+ private static Path DONE = null; // folder for completed jobs
|
|
|
+ /**
|
|
|
+ * A class that manages all the files related to a job. For now
|
|
|
+ * - writers : list of open files
|
|
|
+ * - job history filename
|
|
|
+ * - job conf filename
|
|
|
+ */
|
|
|
+ private static class JobHistoryFilesManager {
|
|
|
+ // a private (virtual) folder for all the files related to a running job
|
|
|
+ private static class FilesHolder {
|
|
|
+ ArrayList<PrintWriter> writers = new ArrayList<PrintWriter>();
|
|
|
+ Path historyFilename; // path of job history file
|
|
|
+ Path confFilename; // path of job's conf
|
|
|
+ }
|
|
|
+
|
|
|
+ // cache from job-key to files associated with it.
|
|
|
+ private Map<JobID, FilesHolder> fileCache =
|
|
|
+ new ConcurrentHashMap<JobID, FilesHolder>();
|
|
|
+
|
|
|
+ private FilesHolder getFileHolder(JobID id) {
|
|
|
+ FilesHolder holder = fileCache.get(id);
|
|
|
+ if (holder == null) {
|
|
|
+ holder = new FilesHolder();
|
|
|
+ fileCache.put(id, holder);
|
|
|
+ }
|
|
|
+ return holder;
|
|
|
+ }
|
|
|
+
|
|
|
+ void addWriter(JobID id, PrintWriter writer) {
|
|
|
+ FilesHolder holder = getFileHolder(id);
|
|
|
+ holder.writers.add(writer);
|
|
|
+ }
|
|
|
+
|
|
|
+ void setHistoryFile(JobID id, Path file) {
|
|
|
+ FilesHolder holder = getFileHolder(id);
|
|
|
+ holder.historyFilename = file;
|
|
|
+ }
|
|
|
+
|
|
|
+ void setConfFile(JobID id, Path file) {
|
|
|
+ FilesHolder holder = getFileHolder(id);
|
|
|
+ holder.confFilename = file;
|
|
|
+ }
|
|
|
+
|
|
|
+ ArrayList<PrintWriter> getWriters(JobID id) {
|
|
|
+ FilesHolder holder = fileCache.get(id);
|
|
|
+ return holder == null ? null : holder.writers;
|
|
|
+ }
|
|
|
+
|
|
|
+ Path getHistoryFile(JobID id) {
|
|
|
+ FilesHolder holder = fileCache.get(id);
|
|
|
+ return holder == null ? null : holder.historyFilename;
|
|
|
+ }
|
|
|
+
|
|
|
+ Path getConfFileWriters(JobID id) {
|
|
|
+ FilesHolder holder = fileCache.get(id);
|
|
|
+ return holder == null ? null : holder.confFilename;
|
|
|
+ }
|
|
|
+
|
|
|
+ void purgeJob(JobID id) {
|
|
|
+ fileCache.remove(id);
|
|
|
+ }
|
|
|
+ }
|
|
|
/**
|
|
|
* Record types are identifiers for each line of log in history files.
|
|
|
* A record type appears as the first token in a single line of log.
|
|
@@ -152,6 +214,7 @@ public class JobHistory {
|
|
|
"file:///" + new File(
|
|
|
System.getProperty("hadoop.log.dir")).getAbsolutePath()
|
|
|
+ File.separator + "history");
|
|
|
+ DONE = new Path(LOG_DIR, "done");
|
|
|
JOBTRACKER_UNIQUE_STRING = hostname + "_" +
|
|
|
String.valueOf(jobTrackerStartTime) + "_";
|
|
|
jobtrackerHostname = hostname;
|
|
@@ -169,6 +232,9 @@ public class JobHistory {
|
|
|
conf.getLong("mapred.jobtracker.job.history.block.size",
|
|
|
3 * 1024 * 1024);
|
|
|
jtConf = conf;
|
|
|
+
|
|
|
+ // create the done folder with appropriate permission
|
|
|
+ fs.mkdirs(DONE, HISTORY_DIR_PERMISSION);
|
|
|
} catch(IOException e) {
|
|
|
LOG.error("Failed to initialize JobHistory log file", e);
|
|
|
disableHistory = true;
|
|
@@ -387,6 +453,13 @@ public class JobHistory {
|
|
|
return new Path(LOG_DIR);
|
|
|
}
|
|
|
|
|
|
+ /**
|
|
|
+ * Get the history location for completed jobs
|
|
|
+ */
|
|
|
+ static Path getCompletedJobHistoryLocation() {
|
|
|
+ return DONE;
|
|
|
+ }
|
|
|
+
|
|
|
/**
|
|
|
* Base class contais utility stuff to manage types key value pairs with enums.
|
|
|
*/
|
|
@@ -639,6 +712,16 @@ public class JobHistory {
|
|
|
*/
|
|
|
public static synchronized String getJobHistoryFileName(JobConf jobConf,
|
|
|
JobID id)
|
|
|
+ throws IOException {
|
|
|
+ return getJobHistoryFileName(jobConf, id, new Path(LOG_DIR));
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * @param dir The directory where to search.
|
|
|
+ */
|
|
|
+ static synchronized String getJobHistoryFileName(JobConf jobConf,
|
|
|
+ JobID id,
|
|
|
+ Path dir)
|
|
|
throws IOException {
|
|
|
String user = getUserName(jobConf);
|
|
|
String jobName = trimJobName(getJobName(jobConf));
|
|
@@ -672,26 +755,33 @@ public class JobHistory {
|
|
|
}
|
|
|
};
|
|
|
|
|
|
- FileStatus[] statuses = fs.listStatus(new Path(LOG_DIR), filter);
|
|
|
+ FileStatus[] statuses = fs.listStatus(dir, filter);
|
|
|
String filename = null;
|
|
|
if (statuses.length == 0) {
|
|
|
LOG.info("Nothing to recover for job " + id);
|
|
|
} else {
|
|
|
// return filename considering that fact the name can be a
|
|
|
// secondary filename like filename.recover
|
|
|
- filename = decodeJobHistoryFileName(statuses[0].getPath().getName());
|
|
|
- // Remove the '.recover' suffix if it exists
|
|
|
- if (filename.endsWith(jobName + SECONDARY_FILE_SUFFIX)) {
|
|
|
- int newLength = filename.length() - SECONDARY_FILE_SUFFIX.length();
|
|
|
- filename = filename.substring(0, newLength);
|
|
|
- }
|
|
|
- filename = encodeJobHistoryFileName(filename);
|
|
|
+ filename = getPrimaryFilename(statuses[0].getPath().getName(), jobName);
|
|
|
LOG.info("Recovered job history filename for job " + id + " is "
|
|
|
+ filename);
|
|
|
}
|
|
|
return filename;
|
|
|
}
|
|
|
|
|
|
+ // removes all extra extensions from a filename and returns the core/primary
|
|
|
+ // filename
|
|
|
+ private static String getPrimaryFilename(String filename, String jobName)
|
|
|
+ throws IOException{
|
|
|
+ filename = decodeJobHistoryFileName(filename);
|
|
|
+ // Remove the '.recover' suffix if it exists
|
|
|
+ if (filename.endsWith(jobName + SECONDARY_FILE_SUFFIX)) {
|
|
|
+ int newLength = filename.length() - SECONDARY_FILE_SUFFIX.length();
|
|
|
+ filename = filename.substring(0, newLength);
|
|
|
+ }
|
|
|
+ return encodeJobHistoryFileName(filename);
|
|
|
+ }
|
|
|
+
|
|
|
/** Since there was a restart, there should be a master file and
|
|
|
* a recovery file. Once the recovery is complete, the master should be
|
|
|
* deleted as an indication that the recovery file should be treated as the
|
|
@@ -788,33 +878,33 @@ public class JobHistory {
|
|
|
|
|
|
/** Finalize the recovery and make one file in the end.
|
|
|
* This invloves renaming the recover file to the master file.
|
|
|
+ * Note that this api should be invoked only if recovery is involved.
|
|
|
* @param id Job id
|
|
|
* @param conf the job conf
|
|
|
* @throws IOException
|
|
|
*/
|
|
|
- static synchronized void finalizeRecovery(JobID id, JobConf conf)
|
|
|
+ static synchronized void finalizeRecovery(JobID id, JobConf conf)
|
|
|
throws IOException {
|
|
|
- String masterLogFileName =
|
|
|
- JobHistory.JobInfo.getJobHistoryFileName(conf, id);
|
|
|
- if (masterLogFileName == null) {
|
|
|
- return;
|
|
|
- }
|
|
|
- Path masterLogPath =
|
|
|
- JobHistory.JobInfo.getJobHistoryLogLocation(masterLogFileName);
|
|
|
- String tmpLogFileName = getSecondaryJobHistoryFile(masterLogFileName);
|
|
|
- Path tmpLogPath =
|
|
|
- JobHistory.JobInfo.getJobHistoryLogLocation(tmpLogFileName);
|
|
|
- if (masterLogPath != null) {
|
|
|
- FileSystem fs = masterLogPath.getFileSystem(conf);
|
|
|
+ Path tmpLogPath = fileManager.getHistoryFile(id);
|
|
|
+ if (tmpLogPath == null) {
|
|
|
+ LOG.debug("No file for job with " + id + " found in cache!");
|
|
|
+ return;
|
|
|
+ }
|
|
|
+ String tmpLogFileName = tmpLogPath.getName();
|
|
|
+
|
|
|
+ // get the primary filename from the cached filename
|
|
|
+ String masterLogFileName =
|
|
|
+ getPrimaryFilename(tmpLogFileName, getJobName(conf));
|
|
|
+ Path masterLogPath = new Path(tmpLogPath.getParent(), masterLogFileName);
|
|
|
+
|
|
|
+ // rename the tmp file to the master file. Note that this should be
|
|
|
+ // done only when the file is closed and handles are released.
|
|
|
+ LOG.info("Renaming " + tmpLogFileName + " to " + masterLogFileName);
|
|
|
+ FileSystem fs = tmpLogPath.getFileSystem(jtConf);
|
|
|
+ fs.rename(tmpLogPath, masterLogPath);
|
|
|
+ // update the cache
|
|
|
+ fileManager.setHistoryFile(id, masterLogPath);
|
|
|
|
|
|
- // rename the tmp file to the master file. Note that this should be
|
|
|
- // done only when the file is closed and handles are released.
|
|
|
- if(fs.exists(tmpLogPath)) {
|
|
|
- LOG.info("Renaming " + tmpLogFileName + " to " + masterLogFileName);
|
|
|
- fs.rename(tmpLogPath, masterLogPath);
|
|
|
- }
|
|
|
- }
|
|
|
-
|
|
|
// do the same for the user file too
|
|
|
masterLogPath =
|
|
|
JobHistory.JobInfo.getJobHistoryLogLocationForUser(masterLogFileName,
|
|
@@ -823,7 +913,7 @@ public class JobHistory {
|
|
|
JobHistory.JobInfo.getJobHistoryLogLocationForUser(tmpLogFileName,
|
|
|
conf);
|
|
|
if (masterLogPath != null) {
|
|
|
- FileSystem fs = masterLogPath.getFileSystem(conf);
|
|
|
+ fs = masterLogPath.getFileSystem(conf);
|
|
|
if (fs.exists(tmpLogPath)) {
|
|
|
LOG.info("Renaming " + tmpLogFileName + " to " + masterLogFileName
|
|
|
+ " in user directory");
|
|
@@ -846,6 +936,38 @@ public class JobHistory {
|
|
|
}
|
|
|
|
|
|
/**
|
|
|
+ * Move the completed job into the completed folder.
|
|
|
+ * This assumes that the jobhistory file is closed and all operations on the
|
|
|
+ * jobhistory file is complete.
|
|
|
+ * This *should* be the last call to jobhistory for a given job.
|
|
|
+ */
|
|
|
+ static void markCompleted(JobID id) throws IOException {
|
|
|
+ Path path = fileManager.getHistoryFile(id);
|
|
|
+ if (path == null) {
|
|
|
+ LOG.info("No file for job-history with " + id + " found in cache!");
|
|
|
+ return;
|
|
|
+ }
|
|
|
+ Path newPath = new Path(DONE, path.getName());
|
|
|
+ LOG.info("Moving completed job from " + path + " to " + newPath);
|
|
|
+ FileSystem fs = path.getFileSystem(jtConf);
|
|
|
+ fs.rename(path, newPath);
|
|
|
+
|
|
|
+ Path confPath = fileManager.getConfFileWriters(id);
|
|
|
+ if (confPath == null) {
|
|
|
+ LOG.info("No file for jobconf with " + id + " found in cache!");
|
|
|
+ return;
|
|
|
+ }
|
|
|
+ // move the conf too
|
|
|
+ newPath = new Path(DONE, confPath.getName());
|
|
|
+ LOG.info("Moving configuration of completed job from " + confPath
|
|
|
+ + " to " + newPath);
|
|
|
+ fs.rename(confPath, newPath);
|
|
|
+
|
|
|
+ // purge the job from the cache
|
|
|
+ fileManager.purgeJob(id);
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
* Log job submitted event to history. Creates a new file in history
|
|
|
* for the job. if history file creation fails, it disables history
|
|
|
* for all other events.
|
|
@@ -885,7 +1007,12 @@ public class JobHistory {
|
|
|
if (logFileName == null) {
|
|
|
logFileName =
|
|
|
encodeJobHistoryFileName(getNewJobHistoryFileName(jobConf, jobId));
|
|
|
-
|
|
|
+ } else {
|
|
|
+ String parts[] = logFileName.split("_");
|
|
|
+ //TODO this is a hack :(
|
|
|
+ // jobtracker-hostname_jobtracker-identifier_
|
|
|
+ String jtUniqueString = parts[0] + "_" + parts[1] + "_";
|
|
|
+ jobUniqueString = jtUniqueString + jobId.toString();
|
|
|
}
|
|
|
} else {
|
|
|
logFileName =
|
|
@@ -900,7 +1027,6 @@ public class JobHistory {
|
|
|
getJobHistoryLogLocationForUser(logFileName, jobConf);
|
|
|
|
|
|
try{
|
|
|
- ArrayList<PrintWriter> writers = new ArrayList<PrintWriter>();
|
|
|
FSDataOutputStream out = null;
|
|
|
PrintWriter writer = null;
|
|
|
|
|
@@ -922,7 +1048,10 @@ public class JobHistory {
|
|
|
fs.getDefaultReplication(),
|
|
|
jobHistoryBlockSize, null);
|
|
|
writer = new PrintWriter(out);
|
|
|
- writers.add(writer);
|
|
|
+ fileManager.addWriter(jobId, writer);
|
|
|
+
|
|
|
+ // cache it ...
|
|
|
+ fileManager.setHistoryFile(jobId, logFile);
|
|
|
}
|
|
|
if (userLogFile != null) {
|
|
|
// Get the actual filename as recoverJobHistoryFile() might return
|
|
@@ -936,11 +1065,10 @@ public class JobHistory {
|
|
|
|
|
|
out = fs.create(userLogFile, true, 4096);
|
|
|
writer = new PrintWriter(out);
|
|
|
- writers.add(writer);
|
|
|
+ fileManager.addWriter(jobId, writer);
|
|
|
}
|
|
|
-
|
|
|
- openJobs.put(jobUniqueString, writers);
|
|
|
|
|
|
+ ArrayList<PrintWriter> writers = fileManager.getWriters(jobId);
|
|
|
// Log the history meta info
|
|
|
JobHistory.MetaInfoManager.logMetaInfo(writers);
|
|
|
|
|
@@ -985,6 +1113,7 @@ public class JobHistory {
|
|
|
if (LOG_DIR != null) {
|
|
|
jobFilePath = new Path(LOG_DIR + File.separator +
|
|
|
jobUniqueString + "_conf.xml");
|
|
|
+ fileManager.setConfFile(jobId, jobFilePath);
|
|
|
}
|
|
|
Path userJobFilePath = null;
|
|
|
if (userLogDir != null) {
|
|
@@ -1018,7 +1147,7 @@ public class JobHistory {
|
|
|
+ jobFilePath + "and" + userJobFilePath );
|
|
|
}
|
|
|
} catch (IOException ioe) {
|
|
|
- LOG.error("Failed to store job conf on the local filesystem ", ioe);
|
|
|
+ LOG.error("Failed to store job conf in the log dir", ioe);
|
|
|
} finally {
|
|
|
if (jobFileOut != null) {
|
|
|
try {
|
|
@@ -1041,8 +1170,7 @@ public class JobHistory {
|
|
|
public static void logInited(JobID jobId, long startTime,
|
|
|
int totalMaps, int totalReduces) {
|
|
|
if (!disableHistory){
|
|
|
- String logFileKey = JOBTRACKER_UNIQUE_STRING + jobId;
|
|
|
- ArrayList<PrintWriter> writer = openJobs.get(logFileKey);
|
|
|
+ ArrayList<PrintWriter> writer = fileManager.getWriters(jobId);
|
|
|
|
|
|
if (null != writer){
|
|
|
JobHistory.log(writer, RecordTypes.Job,
|
|
@@ -1078,8 +1206,7 @@ public class JobHistory {
|
|
|
*/
|
|
|
public static void logStarted(JobID jobId){
|
|
|
if (!disableHistory){
|
|
|
- String logFileKey = JOBTRACKER_UNIQUE_STRING + jobId;
|
|
|
- ArrayList<PrintWriter> writer = openJobs.get(logFileKey);
|
|
|
+ ArrayList<PrintWriter> writer = fileManager.getWriters(jobId);
|
|
|
|
|
|
if (null != writer){
|
|
|
JobHistory.log(writer, RecordTypes.Job,
|
|
@@ -1106,8 +1233,7 @@ public class JobHistory {
|
|
|
Counters counters){
|
|
|
if (!disableHistory){
|
|
|
// close job file for this job
|
|
|
- String logFileKey = JOBTRACKER_UNIQUE_STRING + jobId;
|
|
|
- ArrayList<PrintWriter> writer = openJobs.get(logFileKey);
|
|
|
+ ArrayList<PrintWriter> writer = fileManager.getWriters(jobId);
|
|
|
|
|
|
if (null != writer){
|
|
|
JobHistory.log(writer, RecordTypes.Job,
|
|
@@ -1126,7 +1252,6 @@ public class JobHistory {
|
|
|
for (PrintWriter out : writer) {
|
|
|
out.close();
|
|
|
}
|
|
|
- openJobs.remove(logFileKey);
|
|
|
}
|
|
|
Thread historyCleaner = new Thread(new HistoryCleaner());
|
|
|
historyCleaner.start();
|
|
@@ -1141,8 +1266,7 @@ public class JobHistory {
|
|
|
*/
|
|
|
public static void logFailed(JobID jobid, long timestamp, int finishedMaps, int finishedReduces){
|
|
|
if (!disableHistory){
|
|
|
- String logFileKey = JOBTRACKER_UNIQUE_STRING + jobid;
|
|
|
- ArrayList<PrintWriter> writer = openJobs.get(logFileKey);
|
|
|
+ ArrayList<PrintWriter> writer = fileManager.getWriters(jobid);
|
|
|
|
|
|
if (null != writer){
|
|
|
JobHistory.log(writer, RecordTypes.Job,
|
|
@@ -1152,7 +1276,6 @@ public class JobHistory {
|
|
|
for (PrintWriter out : writer) {
|
|
|
out.close();
|
|
|
}
|
|
|
- openJobs.remove(logFileKey);
|
|
|
}
|
|
|
}
|
|
|
}
|
|
@@ -1171,8 +1294,7 @@ public class JobHistory {
|
|
|
public static void logKilled(JobID jobid, long timestamp, int finishedMaps,
|
|
|
int finishedReduces) {
|
|
|
if (!disableHistory) {
|
|
|
- String logFileKey = JOBTRACKER_UNIQUE_STRING + jobid;
|
|
|
- ArrayList<PrintWriter> writer = openJobs.get(logFileKey);
|
|
|
+ ArrayList<PrintWriter> writer = fileManager.getWriters(jobid);
|
|
|
|
|
|
if (null != writer) {
|
|
|
JobHistory.log(writer, RecordTypes.Job, new Keys[] { Keys.JOBID,
|
|
@@ -1183,7 +1305,6 @@ public class JobHistory {
|
|
|
for (PrintWriter out : writer) {
|
|
|
out.close();
|
|
|
}
|
|
|
- openJobs.remove(logFileKey);
|
|
|
}
|
|
|
}
|
|
|
}
|
|
@@ -1194,8 +1315,7 @@ public class JobHistory {
|
|
|
*/
|
|
|
public static void logJobPriority(JobID jobid, JobPriority priority){
|
|
|
if (!disableHistory){
|
|
|
- String logFileKey = JOBTRACKER_UNIQUE_STRING + jobid;
|
|
|
- ArrayList<PrintWriter> writer = openJobs.get(logFileKey);
|
|
|
+ ArrayList<PrintWriter> writer = fileManager.getWriters(jobid);
|
|
|
|
|
|
if (null != writer){
|
|
|
JobHistory.log(writer, RecordTypes.Job,
|
|
@@ -1220,8 +1340,7 @@ public class JobHistory {
|
|
|
public static void logJobInfo(JobID jobid, long submitTime, long launchTime)
|
|
|
{
|
|
|
if (!disableHistory){
|
|
|
- String logFileKey = JOBTRACKER_UNIQUE_STRING + jobid;
|
|
|
- ArrayList<PrintWriter> writer = openJobs.get(logFileKey);
|
|
|
+ ArrayList<PrintWriter> writer = fileManager.getWriters(jobid);
|
|
|
|
|
|
if (null != writer){
|
|
|
JobHistory.log(writer, RecordTypes.Job,
|
|
@@ -1252,8 +1371,8 @@ public class JobHistory {
|
|
|
public static void logStarted(TaskID taskId, String taskType,
|
|
|
long startTime, String splitLocations) {
|
|
|
if (!disableHistory){
|
|
|
- ArrayList<PrintWriter> writer = openJobs.get(JOBTRACKER_UNIQUE_STRING
|
|
|
- + taskId.getJobID());
|
|
|
+ JobID id = taskId.getJobID();
|
|
|
+ ArrayList<PrintWriter> writer = fileManager.getWriters(id);
|
|
|
|
|
|
if (null != writer){
|
|
|
JobHistory.log(writer, RecordTypes.Task,
|
|
@@ -1274,8 +1393,8 @@ public class JobHistory {
|
|
|
public static void logFinished(TaskID taskId, String taskType,
|
|
|
long finishTime, Counters counters){
|
|
|
if (!disableHistory){
|
|
|
- ArrayList<PrintWriter> writer = openJobs.get(JOBTRACKER_UNIQUE_STRING
|
|
|
- + taskId.getJobID());
|
|
|
+ JobID id = taskId.getJobID();
|
|
|
+ ArrayList<PrintWriter> writer = fileManager.getWriters(id);
|
|
|
|
|
|
if (null != writer){
|
|
|
JobHistory.log(writer, RecordTypes.Task,
|
|
@@ -1296,8 +1415,8 @@ public class JobHistory {
|
|
|
*/
|
|
|
public static void logUpdates(TaskID taskId, long finishTime){
|
|
|
if (!disableHistory){
|
|
|
- ArrayList<PrintWriter> writer = openJobs.get(JOBTRACKER_UNIQUE_STRING
|
|
|
- + taskId.getJobID());
|
|
|
+ JobID id = taskId.getJobID();
|
|
|
+ ArrayList<PrintWriter> writer = fileManager.getWriters(id);
|
|
|
|
|
|
if (null != writer){
|
|
|
JobHistory.log(writer, RecordTypes.Task,
|
|
@@ -1326,8 +1445,8 @@ public class JobHistory {
|
|
|
String error,
|
|
|
TaskAttemptID failedDueToAttempt){
|
|
|
if (!disableHistory){
|
|
|
- ArrayList<PrintWriter> writer = openJobs.get(JOBTRACKER_UNIQUE_STRING
|
|
|
- + taskId.getJobID());
|
|
|
+ JobID id = taskId.getJobID();
|
|
|
+ ArrayList<PrintWriter> writer = fileManager.getWriters(id);
|
|
|
|
|
|
if (null != writer){
|
|
|
String failedAttempt = failedDueToAttempt == null
|
|
@@ -1388,8 +1507,8 @@ public class JobHistory {
|
|
|
String trackerName, int httpPort,
|
|
|
String taskType) {
|
|
|
if (!disableHistory){
|
|
|
- ArrayList<PrintWriter> writer = openJobs.get(JOBTRACKER_UNIQUE_STRING
|
|
|
- + taskAttemptId.getJobID());
|
|
|
+ JobID id = taskAttemptId.getJobID();
|
|
|
+ ArrayList<PrintWriter> writer = fileManager.getWriters(id);
|
|
|
|
|
|
if (null != writer){
|
|
|
JobHistory.log(writer, RecordTypes.MapAttempt,
|
|
@@ -1438,8 +1557,8 @@ public class JobHistory {
|
|
|
String stateString,
|
|
|
Counters counter) {
|
|
|
if (!disableHistory){
|
|
|
- ArrayList<PrintWriter> writer = openJobs.get(JOBTRACKER_UNIQUE_STRING
|
|
|
- + taskAttemptId.getJobID());
|
|
|
+ JobID id = taskAttemptId.getJobID();
|
|
|
+ ArrayList<PrintWriter> writer = fileManager.getWriters(id);
|
|
|
|
|
|
if (null != writer){
|
|
|
JobHistory.log(writer, RecordTypes.MapAttempt,
|
|
@@ -1487,8 +1606,8 @@ public class JobHistory {
|
|
|
long timestamp, String hostName,
|
|
|
String error, String taskType) {
|
|
|
if (!disableHistory){
|
|
|
- ArrayList<PrintWriter> writer = openJobs.get(JOBTRACKER_UNIQUE_STRING
|
|
|
- + taskAttemptId.getJobID());
|
|
|
+ JobID id = taskAttemptId.getJobID();
|
|
|
+ ArrayList<PrintWriter> writer = fileManager.getWriters(id);
|
|
|
|
|
|
if (null != writer){
|
|
|
JobHistory.log(writer, RecordTypes.MapAttempt,
|
|
@@ -1533,8 +1652,8 @@ public class JobHistory {
|
|
|
long timestamp, String hostName,
|
|
|
String error, String taskType) {
|
|
|
if (!disableHistory){
|
|
|
- ArrayList<PrintWriter> writer = openJobs.get(JOBTRACKER_UNIQUE_STRING
|
|
|
- + taskAttemptId.getJobID());
|
|
|
+ JobID id = taskAttemptId.getJobID();
|
|
|
+ ArrayList<PrintWriter> writer = fileManager.getWriters(id);
|
|
|
|
|
|
if (null != writer){
|
|
|
JobHistory.log(writer, RecordTypes.MapAttempt,
|
|
@@ -1585,8 +1704,8 @@ public class JobHistory {
|
|
|
int httpPort,
|
|
|
String taskType) {
|
|
|
if (!disableHistory){
|
|
|
- ArrayList<PrintWriter> writer = openJobs.get(JOBTRACKER_UNIQUE_STRING
|
|
|
- + taskAttemptId.getJobID());
|
|
|
+ JobID id = taskAttemptId.getJobID();
|
|
|
+ ArrayList<PrintWriter> writer = fileManager.getWriters(id);
|
|
|
|
|
|
if (null != writer){
|
|
|
JobHistory.log(writer, RecordTypes.ReduceAttempt,
|
|
@@ -1640,8 +1759,8 @@ public class JobHistory {
|
|
|
String hostName, String taskType,
|
|
|
String stateString, Counters counter) {
|
|
|
if (!disableHistory){
|
|
|
- ArrayList<PrintWriter> writer = openJobs.get(JOBTRACKER_UNIQUE_STRING
|
|
|
- + taskAttemptId.getJobID());
|
|
|
+ JobID id = taskAttemptId.getJobID();
|
|
|
+ ArrayList<PrintWriter> writer = fileManager.getWriters(id);
|
|
|
|
|
|
if (null != writer){
|
|
|
JobHistory.log(writer, RecordTypes.ReduceAttempt,
|
|
@@ -1691,8 +1810,8 @@ public class JobHistory {
|
|
|
String hostName, String error,
|
|
|
String taskType) {
|
|
|
if (!disableHistory){
|
|
|
- ArrayList<PrintWriter> writer = openJobs.get(JOBTRACKER_UNIQUE_STRING
|
|
|
- + taskAttemptId.getJobID());
|
|
|
+ JobID id = taskAttemptId.getJobID();
|
|
|
+ ArrayList<PrintWriter> writer = fileManager.getWriters(id);
|
|
|
|
|
|
if (null != writer){
|
|
|
JobHistory.log(writer, RecordTypes.ReduceAttempt,
|
|
@@ -1737,8 +1856,8 @@ public class JobHistory {
|
|
|
String hostName, String error,
|
|
|
String taskType) {
|
|
|
if (!disableHistory){
|
|
|
- ArrayList<PrintWriter> writer = openJobs.get(JOBTRACKER_UNIQUE_STRING
|
|
|
- + taskAttemptId.getJobID());
|
|
|
+ JobID id = taskAttemptId.getJobID();
|
|
|
+ ArrayList<PrintWriter> writer = fileManager.getWriters(id);
|
|
|
|
|
|
if (null != writer){
|
|
|
JobHistory.log(writer, RecordTypes.ReduceAttempt,
|
|
@@ -1800,9 +1919,8 @@ public class JobHistory {
|
|
|
lastRan = now;
|
|
|
isRunning = true;
|
|
|
try {
|
|
|
- Path logDir = new Path(LOG_DIR);
|
|
|
- FileSystem fs = logDir.getFileSystem(jtConf);
|
|
|
- FileStatus[] historyFiles = fs.listStatus(logDir);
|
|
|
+ FileSystem fs = DONE.getFileSystem(jtConf);
|
|
|
+ FileStatus[] historyFiles = fs.listStatus(DONE);
|
|
|
// delete if older than 30 days
|
|
|
if (historyFiles != null) {
|
|
|
for (FileStatus f : historyFiles) {
|