|
@@ -30,14 +30,19 @@ import java.net.URLDecoder;
|
|
import java.net.URLEncoder;
|
|
import java.net.URLEncoder;
|
|
import java.util.ArrayList;
|
|
import java.util.ArrayList;
|
|
import java.util.HashMap;
|
|
import java.util.HashMap;
|
|
|
|
+import java.util.List;
|
|
import java.util.Map;
|
|
import java.util.Map;
|
|
import java.util.TreeMap;
|
|
import java.util.TreeMap;
|
|
import java.util.concurrent.ConcurrentHashMap;
|
|
import java.util.concurrent.ConcurrentHashMap;
|
|
|
|
+import java.util.concurrent.LinkedBlockingQueue;
|
|
|
|
+import java.util.concurrent.ThreadPoolExecutor;
|
|
|
|
+import java.util.concurrent.TimeUnit;
|
|
import java.util.regex.Matcher;
|
|
import java.util.regex.Matcher;
|
|
import java.util.regex.Pattern;
|
|
import java.util.regex.Pattern;
|
|
|
|
|
|
import org.apache.commons.logging.Log;
|
|
import org.apache.commons.logging.Log;
|
|
import org.apache.commons.logging.LogFactory;
|
|
import org.apache.commons.logging.LogFactory;
|
|
|
|
+import org.apache.hadoop.conf.Configuration;
|
|
import org.apache.hadoop.fs.FSDataInputStream;
|
|
import org.apache.hadoop.fs.FSDataInputStream;
|
|
import org.apache.hadoop.fs.FSDataOutputStream;
|
|
import org.apache.hadoop.fs.FSDataOutputStream;
|
|
import org.apache.hadoop.fs.FileStatus;
|
|
import org.apache.hadoop.fs.FileStatus;
|
|
@@ -101,6 +106,8 @@ public class JobHistory {
|
|
FsPermission.createImmutable((short) 0750); // rwxr-x---
|
|
FsPermission.createImmutable((short) 0750); // rwxr-x---
|
|
final static FsPermission HISTORY_FILE_PERMISSION =
|
|
final static FsPermission HISTORY_FILE_PERMISSION =
|
|
FsPermission.createImmutable((short) 0740); // rwxr-----
|
|
FsPermission.createImmutable((short) 0740); // rwxr-----
|
|
|
|
+ private static FileSystem LOGDIR_FS; // log dir filesystem
|
|
|
|
+ private static FileSystem DONEDIR_FS; // Done dir filesystem
|
|
private static JobConf jtConf;
|
|
private static JobConf jtConf;
|
|
private static Path DONE = null; // folder for completed jobs
|
|
private static Path DONE = null; // folder for completed jobs
|
|
/**
|
|
/**
|
|
@@ -125,11 +132,23 @@ public class JobHistory {
|
|
Path historyFilename; // path of job history file
|
|
Path historyFilename; // path of job history file
|
|
Path confFilename; // path of job's conf
|
|
Path confFilename; // path of job's conf
|
|
}
|
|
}
|
|
-
|
|
|
|
|
|
+
|
|
|
|
+ private ThreadPoolExecutor executor = null;
|
|
|
|
+ private final Configuration conf;
|
|
|
|
+
|
|
// cache from job-key to files associated with it.
|
|
// cache from job-key to files associated with it.
|
|
private Map<JobID, FilesHolder> fileCache =
|
|
private Map<JobID, FilesHolder> fileCache =
|
|
new ConcurrentHashMap<JobID, FilesHolder>();
|
|
new ConcurrentHashMap<JobID, FilesHolder>();
|
|
|
|
|
|
|
|
+ JobHistoryFilesManager(Configuration conf) throws IOException {
|
|
|
|
+ this.conf = conf;
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ void start() {
|
|
|
|
+ executor = new ThreadPoolExecutor(1, 3, 1,
|
|
|
|
+ TimeUnit.HOURS, new LinkedBlockingQueue<Runnable>());
|
|
|
|
+ }
|
|
|
|
+
|
|
private FilesHolder getFileHolder(JobID id) {
|
|
private FilesHolder getFileHolder(JobID id) {
|
|
FilesHolder holder = fileCache.get(id);
|
|
FilesHolder holder = fileCache.get(id);
|
|
if (holder == null) {
|
|
if (holder == null) {
|
|
@@ -172,6 +191,33 @@ public class JobHistory {
|
|
void purgeJob(JobID id) {
|
|
void purgeJob(JobID id) {
|
|
fileCache.remove(id);
|
|
fileCache.remove(id);
|
|
}
|
|
}
|
|
|
|
+
|
|
|
|
+ void moveToDone(final JobID id, final List<Path> paths) {
|
|
|
|
+ executor.execute(new Runnable() {
|
|
|
|
+
|
|
|
|
+ public void run() {
|
|
|
|
+ //move the files to DONE folder
|
|
|
|
+ try {
|
|
|
|
+ for (Path path : paths) {
|
|
|
|
+ //check if path exists, in case of retries it may not exist
|
|
|
|
+ if (LOGDIR_FS.exists(path)) {
|
|
|
|
+ LOG.info("Moving " + path.toString() + " to " +
|
|
|
|
+ DONE.toString());
|
|
|
|
+ DONEDIR_FS.moveFromLocalFile(path, DONE);
|
|
|
|
+ DONEDIR_FS.setPermission(new Path(DONE, path.getName()),
|
|
|
|
+ new FsPermission(HISTORY_FILE_PERMISSION));
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ //purge the job from the cache
|
|
|
|
+ fileManager.purgeJob(id);
|
|
|
|
+ } catch (Throwable e) {
|
|
|
|
+ LOG.error("Unable to move history file to DONE folder.", e);
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ });
|
|
|
|
+ }
|
|
}
|
|
}
|
|
/**
|
|
/**
|
|
* Record types are identifiers for each line of log in history files.
|
|
* Record types are identifiers for each line of log in history files.
|
|
@@ -222,14 +268,13 @@ public class JobHistory {
|
|
"file:///" + new File(
|
|
"file:///" + new File(
|
|
System.getProperty("hadoop.log.dir")).getAbsolutePath()
|
|
System.getProperty("hadoop.log.dir")).getAbsolutePath()
|
|
+ File.separator + "history");
|
|
+ File.separator + "history");
|
|
- DONE = new Path(LOG_DIR, "done");
|
|
|
|
JOBTRACKER_UNIQUE_STRING = hostname + "_" +
|
|
JOBTRACKER_UNIQUE_STRING = hostname + "_" +
|
|
String.valueOf(jobTrackerStartTime) + "_";
|
|
String.valueOf(jobTrackerStartTime) + "_";
|
|
jobtrackerHostname = hostname;
|
|
jobtrackerHostname = hostname;
|
|
Path logDir = new Path(LOG_DIR);
|
|
Path logDir = new Path(LOG_DIR);
|
|
- FileSystem fs = logDir.getFileSystem(conf);
|
|
|
|
- if (!fs.exists(logDir)){
|
|
|
|
- if (!fs.mkdirs(logDir, new FsPermission(HISTORY_DIR_PERMISSION))) {
|
|
|
|
|
|
+ LOGDIR_FS = logDir.getFileSystem(conf);
|
|
|
|
+ if (!LOGDIR_FS.exists(logDir)){
|
|
|
|
+ if (!LOGDIR_FS.mkdirs(logDir, new FsPermission(HISTORY_DIR_PERMISSION))) {
|
|
throw new IOException("Mkdirs failed to create " + logDir.toString());
|
|
throw new IOException("Mkdirs failed to create " + logDir.toString());
|
|
}
|
|
}
|
|
}
|
|
}
|
|
@@ -241,11 +286,39 @@ public class JobHistory {
|
|
3 * 1024 * 1024);
|
|
3 * 1024 * 1024);
|
|
jtConf = conf;
|
|
jtConf = conf;
|
|
|
|
|
|
- // create the done folder with appropriate permission
|
|
|
|
- fs.mkdirs(DONE, HISTORY_DIR_PERMISSION);
|
|
|
|
-
|
|
|
|
// initialize the file manager
|
|
// initialize the file manager
|
|
- fileManager = new JobHistoryFilesManager();
|
|
|
|
|
|
+ fileManager = new JobHistoryFilesManager(conf);
|
|
|
|
+ } catch(IOException e) {
|
|
|
|
+ LOG.error("Failed to initialize JobHistory log file", e);
|
|
|
|
+ disableHistory = true;
|
|
|
|
+ }
|
|
|
|
+ return !(disableHistory);
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ static boolean initDone(JobConf conf, FileSystem fs){
|
|
|
|
+ try {
|
|
|
|
+ //if completed job history location is set, use that
|
|
|
|
+ String doneLocation = conf.
|
|
|
|
+ get("mapred.job.tracker.history.completed.location");
|
|
|
|
+ if (doneLocation != null) {
|
|
|
|
+ DONE = fs.makeQualified(new Path(doneLocation));
|
|
|
|
+ DONEDIR_FS = fs;
|
|
|
|
+ } else {
|
|
|
|
+ DONE = new Path(LOG_DIR, "done");
|
|
|
|
+ DONEDIR_FS = LOGDIR_FS;
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ //If not already present create the done folder with appropriate
|
|
|
|
+ //permission
|
|
|
|
+ if (!DONEDIR_FS.exists(DONE)) {
|
|
|
|
+ LOG.info("Creating DONE folder at "+ DONE);
|
|
|
|
+ if (! DONEDIR_FS.mkdirs(DONE,
|
|
|
|
+ new FsPermission(HISTORY_DIR_PERMISSION))) {
|
|
|
|
+ throw new IOException("Mkdirs failed to create " + DONE.toString());
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ fileManager.start();
|
|
} catch(IOException e) {
|
|
} catch(IOException e) {
|
|
LOG.error("Failed to initialize JobHistory log file", e);
|
|
LOG.error("Failed to initialize JobHistory log file", e);
|
|
disableHistory = true;
|
|
disableHistory = true;
|
|
@@ -253,6 +326,7 @@ public class JobHistory {
|
|
return !(disableHistory);
|
|
return !(disableHistory);
|
|
}
|
|
}
|
|
|
|
|
|
|
|
+
|
|
/**
|
|
/**
|
|
* Manages job-history's meta information such as version etc.
|
|
* Manages job-history's meta information such as version etc.
|
|
* Helps in logging version information to the job-history and recover
|
|
* Helps in logging version information to the job-history and recover
|
|
@@ -724,20 +798,26 @@ public class JobHistory {
|
|
public static synchronized String getJobHistoryFileName(JobConf jobConf,
|
|
public static synchronized String getJobHistoryFileName(JobConf jobConf,
|
|
JobID id)
|
|
JobID id)
|
|
throws IOException {
|
|
throws IOException {
|
|
- return getJobHistoryFileName(jobConf, id, new Path(LOG_DIR));
|
|
|
|
|
|
+ return getJobHistoryFileName(jobConf, id, new Path(LOG_DIR), LOGDIR_FS);
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ static synchronized String getDoneJobHistoryFileName(JobConf jobConf,
|
|
|
|
+ JobID id) throws IOException {
|
|
|
|
+ if (DONE == null) {
|
|
|
|
+ return null;
|
|
|
|
+ }
|
|
|
|
+ return getJobHistoryFileName(jobConf, id, DONE, DONEDIR_FS);
|
|
}
|
|
}
|
|
|
|
|
|
/**
|
|
/**
|
|
* @param dir The directory where to search.
|
|
* @param dir The directory where to search.
|
|
*/
|
|
*/
|
|
- static synchronized String getJobHistoryFileName(JobConf jobConf,
|
|
|
|
- JobID id,
|
|
|
|
- Path dir)
|
|
|
|
|
|
+ private static synchronized String getJobHistoryFileName(JobConf jobConf,
|
|
|
|
+ JobID id, Path dir, FileSystem fs)
|
|
throws IOException {
|
|
throws IOException {
|
|
String user = getUserName(jobConf);
|
|
String user = getUserName(jobConf);
|
|
String jobName = trimJobName(getJobName(jobConf));
|
|
String jobName = trimJobName(getJobName(jobConf));
|
|
|
|
|
|
- FileSystem fs = new Path(LOG_DIR).getFileSystem(jobConf);
|
|
|
|
if (LOG_DIR == null) {
|
|
if (LOG_DIR == null) {
|
|
return null;
|
|
return null;
|
|
}
|
|
}
|
|
@@ -805,9 +885,8 @@ public class JobHistory {
|
|
throws IOException {
|
|
throws IOException {
|
|
Path logPath = JobHistory.JobInfo.getJobHistoryLogLocation(fileName);
|
|
Path logPath = JobHistory.JobInfo.getJobHistoryLogLocation(fileName);
|
|
if (logPath != null) {
|
|
if (logPath != null) {
|
|
- FileSystem fs = logPath.getFileSystem(conf);
|
|
|
|
LOG.info("Deleting job history file " + logPath.getName());
|
|
LOG.info("Deleting job history file " + logPath.getName());
|
|
- fs.delete(logPath, false);
|
|
|
|
|
|
+ LOGDIR_FS.delete(logPath, false);
|
|
}
|
|
}
|
|
// do the same for the user file too
|
|
// do the same for the user file too
|
|
logPath = JobHistory.JobInfo.getJobHistoryLogLocationForUser(fileName,
|
|
logPath = JobHistory.JobInfo.getJobHistoryLogLocationForUser(fileName,
|
|
@@ -835,25 +914,24 @@ public class JobHistory {
|
|
Path logFilePath)
|
|
Path logFilePath)
|
|
throws IOException {
|
|
throws IOException {
|
|
Path ret;
|
|
Path ret;
|
|
- FileSystem fs = logFilePath.getFileSystem(conf);
|
|
|
|
String logFileName = logFilePath.getName();
|
|
String logFileName = logFilePath.getName();
|
|
String tmpFilename = getSecondaryJobHistoryFile(logFileName);
|
|
String tmpFilename = getSecondaryJobHistoryFile(logFileName);
|
|
Path logDir = logFilePath.getParent();
|
|
Path logDir = logFilePath.getParent();
|
|
Path tmpFilePath = new Path(logDir, tmpFilename);
|
|
Path tmpFilePath = new Path(logDir, tmpFilename);
|
|
- if (fs.exists(logFilePath)) {
|
|
|
|
|
|
+ if (LOGDIR_FS.exists(logFilePath)) {
|
|
LOG.info(logFileName + " exists!");
|
|
LOG.info(logFileName + " exists!");
|
|
- if (fs.exists(tmpFilePath)) {
|
|
|
|
|
|
+ if (LOGDIR_FS.exists(tmpFilePath)) {
|
|
LOG.info("Deleting " + tmpFilename
|
|
LOG.info("Deleting " + tmpFilename
|
|
+ " and using " + logFileName + " for recovery.");
|
|
+ " and using " + logFileName + " for recovery.");
|
|
- fs.delete(tmpFilePath, false);
|
|
|
|
|
|
+ LOGDIR_FS.delete(tmpFilePath, false);
|
|
}
|
|
}
|
|
ret = tmpFilePath;
|
|
ret = tmpFilePath;
|
|
} else {
|
|
} else {
|
|
LOG.info(logFileName + " doesnt exist! Using "
|
|
LOG.info(logFileName + " doesnt exist! Using "
|
|
+ tmpFilename + " for recovery.");
|
|
+ tmpFilename + " for recovery.");
|
|
- if (fs.exists(tmpFilePath)) {
|
|
|
|
|
|
+ if (LOGDIR_FS.exists(tmpFilePath)) {
|
|
LOG.info("Renaming " + tmpFilename + " to " + logFileName);
|
|
LOG.info("Renaming " + tmpFilename + " to " + logFileName);
|
|
- fs.rename(tmpFilePath, logFilePath);
|
|
|
|
|
|
+ LOGDIR_FS.rename(tmpFilePath, logFilePath);
|
|
ret = tmpFilePath;
|
|
ret = tmpFilePath;
|
|
} else {
|
|
} else {
|
|
ret = logFilePath;
|
|
ret = logFilePath;
|
|
@@ -863,7 +941,7 @@ public class JobHistory {
|
|
// do the same for the user files too
|
|
// do the same for the user files too
|
|
logFilePath = getJobHistoryLogLocationForUser(logFileName, conf);
|
|
logFilePath = getJobHistoryLogLocationForUser(logFileName, conf);
|
|
if (logFilePath != null) {
|
|
if (logFilePath != null) {
|
|
- fs = logFilePath.getFileSystem(conf);
|
|
|
|
|
|
+ FileSystem fs = logFilePath.getFileSystem(conf);
|
|
logDir = logFilePath.getParent();
|
|
logDir = logFilePath.getParent();
|
|
tmpFilePath = new Path(logDir, tmpFilename);
|
|
tmpFilePath = new Path(logDir, tmpFilename);
|
|
if (fs.exists(logFilePath)) {
|
|
if (fs.exists(logFilePath)) {
|
|
@@ -911,8 +989,7 @@ public class JobHistory {
|
|
// rename the tmp file to the master file. Note that this should be
|
|
// rename the tmp file to the master file. Note that this should be
|
|
// done only when the file is closed and handles are released.
|
|
// done only when the file is closed and handles are released.
|
|
LOG.info("Renaming " + tmpLogFileName + " to " + masterLogFileName);
|
|
LOG.info("Renaming " + tmpLogFileName + " to " + masterLogFileName);
|
|
- FileSystem fs = tmpLogPath.getFileSystem(jtConf);
|
|
|
|
- fs.rename(tmpLogPath, masterLogPath);
|
|
|
|
|
|
+ LOGDIR_FS.rename(tmpLogPath, masterLogPath);
|
|
// update the cache
|
|
// update the cache
|
|
fileManager.setHistoryFile(id, masterLogPath);
|
|
fileManager.setHistoryFile(id, masterLogPath);
|
|
|
|
|
|
@@ -924,7 +1001,7 @@ public class JobHistory {
|
|
JobHistory.JobInfo.getJobHistoryLogLocationForUser(tmpLogFileName,
|
|
JobHistory.JobInfo.getJobHistoryLogLocationForUser(tmpLogFileName,
|
|
conf);
|
|
conf);
|
|
if (masterLogPath != null) {
|
|
if (masterLogPath != null) {
|
|
- fs = masterLogPath.getFileSystem(conf);
|
|
|
|
|
|
+ FileSystem fs = masterLogPath.getFileSystem(conf);
|
|
if (fs.exists(tmpLogPath)) {
|
|
if (fs.exists(tmpLogPath)) {
|
|
LOG.info("Renaming " + tmpLogFileName + " to " + masterLogFileName
|
|
LOG.info("Renaming " + tmpLogFileName + " to " + masterLogFileName
|
|
+ " in user directory");
|
|
+ " in user directory");
|
|
@@ -966,29 +1043,27 @@ public class JobHistory {
|
|
* This *should* be the last call to jobhistory for a given job.
|
|
* This *should* be the last call to jobhistory for a given job.
|
|
*/
|
|
*/
|
|
static void markCompleted(JobID id) throws IOException {
|
|
static void markCompleted(JobID id) throws IOException {
|
|
|
|
+ List<Path> paths = new ArrayList<Path>();
|
|
Path path = fileManager.getHistoryFile(id);
|
|
Path path = fileManager.getHistoryFile(id);
|
|
if (path == null) {
|
|
if (path == null) {
|
|
LOG.info("No file for job-history with " + id + " found in cache!");
|
|
LOG.info("No file for job-history with " + id + " found in cache!");
|
|
return;
|
|
return;
|
|
|
|
+ } else {
|
|
|
|
+ paths.add(path);
|
|
}
|
|
}
|
|
- Path newPath = new Path(DONE, path.getName());
|
|
|
|
- LOG.info("Moving completed job from " + path + " to " + newPath);
|
|
|
|
- FileSystem fs = path.getFileSystem(jtConf);
|
|
|
|
- fs.rename(path, newPath);
|
|
|
|
|
|
|
|
Path confPath = fileManager.getConfFileWriters(id);
|
|
Path confPath = fileManager.getConfFileWriters(id);
|
|
if (confPath == null) {
|
|
if (confPath == null) {
|
|
LOG.info("No file for jobconf with " + id + " found in cache!");
|
|
LOG.info("No file for jobconf with " + id + " found in cache!");
|
|
return;
|
|
return;
|
|
|
|
+ } else {
|
|
|
|
+ paths.add(confPath);
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ //move the job files to done folder and purge the job
|
|
|
|
+ if (paths.size() > 0) {
|
|
|
|
+ fileManager.moveToDone(id, paths);
|
|
}
|
|
}
|
|
- // move the conf too
|
|
|
|
- newPath = new Path(DONE, confPath.getName());
|
|
|
|
- LOG.info("Moving configuration of completed job from " + confPath
|
|
|
|
- + " to " + newPath);
|
|
|
|
- fs.rename(confPath, newPath);
|
|
|
|
-
|
|
|
|
- // purge the job from the cache
|
|
|
|
- fileManager.purgeJob(id);
|
|
|
|
}
|
|
}
|
|
|
|
|
|
/**
|
|
/**
|
|
@@ -1056,20 +1131,18 @@ public class JobHistory {
|
|
|
|
|
|
if (LOG_DIR != null) {
|
|
if (LOG_DIR != null) {
|
|
// create output stream for logging in hadoop.job.history.location
|
|
// create output stream for logging in hadoop.job.history.location
|
|
- fs = new Path(LOG_DIR).getFileSystem(jobConf);
|
|
|
|
-
|
|
|
|
if (restarted) {
|
|
if (restarted) {
|
|
logFile = recoverJobHistoryFile(jobConf, logFile);
|
|
logFile = recoverJobHistoryFile(jobConf, logFile);
|
|
logFileName = logFile.getName();
|
|
logFileName = logFile.getName();
|
|
}
|
|
}
|
|
|
|
|
|
int defaultBufferSize =
|
|
int defaultBufferSize =
|
|
- fs.getConf().getInt("io.file.buffer.size", 4096);
|
|
|
|
- out = fs.create(logFile,
|
|
|
|
|
|
+ LOGDIR_FS.getConf().getInt("io.file.buffer.size", 4096);
|
|
|
|
+ out = LOGDIR_FS.create(logFile,
|
|
new FsPermission(HISTORY_FILE_PERMISSION),
|
|
new FsPermission(HISTORY_FILE_PERMISSION),
|
|
true,
|
|
true,
|
|
defaultBufferSize,
|
|
defaultBufferSize,
|
|
- fs.getDefaultReplication(),
|
|
|
|
|
|
+ LOGDIR_FS.getDefaultReplication(),
|
|
jobHistoryBlockSize, null);
|
|
jobHistoryBlockSize, null);
|
|
writer = new PrintWriter(out);
|
|
writer = new PrintWriter(out);
|
|
fileManager.addWriter(jobId, writer);
|
|
fileManager.addWriter(jobId, writer);
|
|
@@ -1147,16 +1220,15 @@ public class JobHistory {
|
|
FSDataOutputStream jobFileOut = null;
|
|
FSDataOutputStream jobFileOut = null;
|
|
try {
|
|
try {
|
|
if (LOG_DIR != null) {
|
|
if (LOG_DIR != null) {
|
|
- fs = new Path(LOG_DIR).getFileSystem(jobConf);
|
|
|
|
int defaultBufferSize =
|
|
int defaultBufferSize =
|
|
- fs.getConf().getInt("io.file.buffer.size", 4096);
|
|
|
|
- if (!fs.exists(jobFilePath)) {
|
|
|
|
- jobFileOut = fs.create(jobFilePath,
|
|
|
|
|
|
+ LOGDIR_FS.getConf().getInt("io.file.buffer.size", 4096);
|
|
|
|
+ if (!LOGDIR_FS.exists(jobFilePath)) {
|
|
|
|
+ jobFileOut = LOGDIR_FS.create(jobFilePath,
|
|
new FsPermission(HISTORY_FILE_PERMISSION),
|
|
new FsPermission(HISTORY_FILE_PERMISSION),
|
|
true,
|
|
true,
|
|
defaultBufferSize,
|
|
defaultBufferSize,
|
|
- fs.getDefaultReplication(),
|
|
|
|
- fs.getDefaultBlockSize(), null);
|
|
|
|
|
|
+ LOGDIR_FS.getDefaultReplication(),
|
|
|
|
+ LOGDIR_FS.getDefaultBlockSize(), null);
|
|
jobConf.writeXml(jobFileOut);
|
|
jobConf.writeXml(jobFileOut);
|
|
jobFileOut.close();
|
|
jobFileOut.close();
|
|
}
|
|
}
|
|
@@ -1943,13 +2015,12 @@ public class JobHistory {
|
|
lastRan = now;
|
|
lastRan = now;
|
|
isRunning = true;
|
|
isRunning = true;
|
|
try {
|
|
try {
|
|
- FileSystem fs = DONE.getFileSystem(jtConf);
|
|
|
|
- FileStatus[] historyFiles = fs.listStatus(DONE);
|
|
|
|
|
|
+ FileStatus[] historyFiles = DONEDIR_FS.listStatus(DONE);
|
|
// delete if older than 30 days
|
|
// delete if older than 30 days
|
|
if (historyFiles != null) {
|
|
if (historyFiles != null) {
|
|
for (FileStatus f : historyFiles) {
|
|
for (FileStatus f : historyFiles) {
|
|
if (now - f.getModificationTime() > THIRTY_DAYS_IN_MS) {
|
|
if (now - f.getModificationTime() > THIRTY_DAYS_IN_MS) {
|
|
- fs.delete(f.getPath(), true);
|
|
|
|
|
|
+ DONEDIR_FS.delete(f.getPath(), true);
|
|
LOG.info("Deleting old history file : " + f.getPath());
|
|
LOG.info("Deleting old history file : " + f.getPath());
|
|
}
|
|
}
|
|
}
|
|
}
|