|
@@ -45,6 +45,27 @@ class TaskLog {
|
|
|
}
|
|
|
}
|
|
|
|
|
|
+ /**
|
|
|
+ * The filter for userlogs.
|
|
|
+ */
|
|
|
+ public static enum LogFilter {
|
|
|
+ /** Log on the stdout of the task. */
|
|
|
+ STDOUT ("stdout"),
|
|
|
+
|
|
|
+ /** Log on the stderr of the task. */
|
|
|
+ STDERR ("stderr");
|
|
|
+
|
|
|
+ private String prefix;
|
|
|
+
|
|
|
+ private LogFilter(String prefix) {
|
|
|
+ this.prefix = prefix;
|
|
|
+ }
|
|
|
+
|
|
|
+ String getPrefix() {
|
|
|
+ return prefix;
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
/**
|
|
|
* The log-writer responsible for handling writing user-logs
|
|
|
* and maintaining splits and ensuring job-specifc limits
|
|
@@ -55,6 +76,7 @@ class TaskLog {
|
|
|
static class Writer {
|
|
|
private String taskId;
|
|
|
private JobConf conf;
|
|
|
+ private LogFilter filter;
|
|
|
|
|
|
private final File taskLogDir;
|
|
|
private final int noKeepSplits;
|
|
@@ -78,11 +100,15 @@ class TaskLog {
|
|
|
* Creates a new TaskLog writer.
|
|
|
* @param conf configuration of the task
|
|
|
* @param taskId taskid of the task
|
|
|
+ * @param filter the {@link LogFilter} to apply on userlogs.
|
|
|
*/
|
|
|
- Writer(JobConf conf, String taskId) {
|
|
|
+ Writer(JobConf conf, String taskId, LogFilter filter) {
|
|
|
this.conf = conf;
|
|
|
this.taskId = taskId;
|
|
|
- this.taskLogDir = new File(LOG_DIR, this.taskId);
|
|
|
+ this.filter = filter;
|
|
|
+
|
|
|
+ this.taskLogDir = new File(new File(LOG_DIR, this.taskId),
|
|
|
+ this.filter.getPrefix());
|
|
|
|
|
|
noKeepSplits = this.conf.getInt("mapred.userlog.num.splits", 4);
|
|
|
splitFileSize =
|
|
@@ -114,11 +140,14 @@ class TaskLog {
|
|
|
File[] files = dir.listFiles();
|
|
|
if (files != null) {
|
|
|
for (int i=0; i < files.length; ++i) {
|
|
|
+ if (files[i].isDirectory()) {
|
|
|
+ deleteDir(files[i]);
|
|
|
+ }
|
|
|
files[i].delete();
|
|
|
}
|
|
|
}
|
|
|
boolean del = dir.delete();
|
|
|
- LOG.debug("Deleted " + del + ": " + del);
|
|
|
+ LOG.debug("Deleted " + dir + ": " + del);
|
|
|
}
|
|
|
|
|
|
/**
|
|
@@ -257,6 +286,8 @@ class TaskLog {
|
|
|
*/
|
|
|
static class Reader {
|
|
|
private String taskId;
|
|
|
+ private LogFilter filter;
|
|
|
+
|
|
|
private File taskLogDir;
|
|
|
private boolean initialized = false;
|
|
|
|
|
@@ -269,10 +300,14 @@ class TaskLog {
|
|
|
* Create a new task log reader.
|
|
|
*
|
|
|
* @param taskId task id of the task.
|
|
|
+ * @param filter the {@link LogFilter} to apply on userlogs.
|
|
|
*/
|
|
|
- public Reader(String taskId) {
|
|
|
+ public Reader(String taskId, LogFilter filter) {
|
|
|
this.taskId = taskId;
|
|
|
- this.taskLogDir = new File(LOG_DIR, this.taskId);
|
|
|
+ this.filter = filter;
|
|
|
+
|
|
|
+ this.taskLogDir = new File(new File(LOG_DIR, this.taskId),
|
|
|
+ this.filter.getPrefix());
|
|
|
}
|
|
|
|
|
|
private static class IndexRecord {
|
|
@@ -289,8 +324,8 @@ class TaskLog {
|
|
|
|
|
|
private synchronized void init() throws IOException {
|
|
|
this.splitIndex = new BufferedReader(new InputStreamReader(
|
|
|
- new FileInputStream(new File(taskLogDir, SPLIT_INDEX_NAME))
|
|
|
- ));
|
|
|
+ new FileInputStream(new File(taskLogDir,
|
|
|
+ SPLIT_INDEX_NAME))));
|
|
|
|
|
|
// Parse the split-index and store the offsets/lengths
|
|
|
ArrayList<IndexRecord> records = new ArrayList<IndexRecord>();
|
|
@@ -336,6 +371,7 @@ class TaskLog {
|
|
|
|
|
|
return logFileSize;
|
|
|
}
|
|
|
+
|
|
|
/**
|
|
|
* Return the entire user-log (remaining splits).
|
|
|
*
|
|
@@ -427,7 +463,7 @@ class TaskLog {
|
|
|
public synchronized int read(byte[] b, int off, int len,
|
|
|
long logOffset, long logLength)
|
|
|
throws IOException {
|
|
|
- LOG.debug("TaskeLog.Reader.read: logOffset: " + logOffset + " - logLength: " + logLength);
|
|
|
+ LOG.debug("TaskLog.Reader.read: logOffset: " + logOffset + " - logLength: " + logLength);
|
|
|
|
|
|
// Sanity check
|
|
|
if (logLength == 0) {
|