|
@@ -20,38 +20,37 @@ package org.apache.hadoop.mapred;
|
|
|
|
|
|
import java.io.*;
|
|
|
import java.util.ArrayList;
|
|
|
-import java.util.Vector;
|
|
|
+import java.util.List;
|
|
|
|
|
|
import org.apache.commons.logging.*;
|
|
|
+import org.apache.hadoop.fs.FileUtil;
|
|
|
|
|
|
/**
|
|
|
* A simple logger to handle the task-specific user logs.
|
|
|
* This class uses the system property <code>hadoop.log.dir</code>.
|
|
|
*
|
|
|
*/
|
|
|
-class TaskLog {
|
|
|
+public class TaskLog {
|
|
|
private static final Log LOG =
|
|
|
LogFactory.getLog(TaskLog.class.getName());
|
|
|
|
|
|
private static final File LOG_DIR =
|
|
|
new File(System.getProperty("hadoop.log.dir"), "userlogs");
|
|
|
|
|
|
- private static final String SPLIT_INDEX_NAME = "split.idx";
|
|
|
-
|
|
|
static {
|
|
|
if (!LOG_DIR.exists()) {
|
|
|
LOG_DIR.mkdirs();
|
|
|
}
|
|
|
}
|
|
|
|
|
|
- private static File getTaskLogDir(String taskid, LogFilter filter) {
|
|
|
- return new File(new File(LOG_DIR, taskid), filter.getPrefix());
|
|
|
+ public static File getTaskLogFile(String taskid, LogName filter) {
|
|
|
+ return new File(new File(LOG_DIR, taskid), filter.toString());
|
|
|
}
|
|
|
|
|
|
/**
|
|
|
* The filter for userlogs.
|
|
|
*/
|
|
|
- public static enum LogFilter {
|
|
|
+ public static enum LogName {
|
|
|
/** Log on the stdout of the task. */
|
|
|
STDOUT ("stdout"),
|
|
|
|
|
@@ -63,500 +62,189 @@ class TaskLog {
|
|
|
|
|
|
private String prefix;
|
|
|
|
|
|
- private LogFilter(String prefix) {
|
|
|
+ private LogName(String prefix) {
|
|
|
this.prefix = prefix;
|
|
|
}
|
|
|
|
|
|
- String getPrefix() {
|
|
|
+ public String toString() {
|
|
|
return prefix;
|
|
|
}
|
|
|
}
|
|
|
-
|
|
|
- /**
|
|
|
- * The log-writer responsible for handling writing user-logs
|
|
|
- * and maintaining splits and ensuring job-specifc limits
|
|
|
- * w.r.t logs-size etc. are honoured.
|
|
|
- *
|
|
|
- */
|
|
|
- static class Writer {
|
|
|
- private String taskId;
|
|
|
- private LogFilter filter;
|
|
|
-
|
|
|
- private final File taskLogDir;
|
|
|
- private final int noKeepSplits;
|
|
|
- private final long splitFileSize;
|
|
|
- private final boolean purgeLogSplits;
|
|
|
- private final int logsRetainHours;
|
|
|
-
|
|
|
- private boolean initialized = false;
|
|
|
- private long splitOffset = 0;
|
|
|
- private long splitLength = 0;
|
|
|
- private int noSplits = 0;
|
|
|
-
|
|
|
- private File currentSplit; // current split filename
|
|
|
- private OutputStream out; // current split
|
|
|
- private OutputStream splitIndex; // split index file
|
|
|
-
|
|
|
- private int flushCtr = 0;
|
|
|
- private final static int FLUSH_BYTES = 256;
|
|
|
-
|
|
|
- /**
|
|
|
- * Creates a new TaskLog writer.
|
|
|
- * @param conf configuration of the task
|
|
|
- * @param taskId taskid of the task
|
|
|
- * @param filter the {@link LogFilter} to apply on userlogs.
|
|
|
- */
|
|
|
- Writer(String taskId, LogFilter filter,
|
|
|
- int noKeepSplits, long totalLogSize, boolean purgeLogSplits, int logsRetainHours) {
|
|
|
- this.taskId = taskId;
|
|
|
- this.filter = filter;
|
|
|
-
|
|
|
- this.taskLogDir = getTaskLogDir(this.taskId, this.filter);
|
|
|
-
|
|
|
- this.noKeepSplits = noKeepSplits;
|
|
|
- this.splitFileSize = (totalLogSize / noKeepSplits);
|
|
|
- this.purgeLogSplits = purgeLogSplits;
|
|
|
- this.logsRetainHours = logsRetainHours;
|
|
|
- }
|
|
|
-
|
|
|
- private static class TaskLogsPurgeFilter implements FileFilter {
|
|
|
- long purgeTimeStamp;
|
|
|
-
|
|
|
- TaskLogsPurgeFilter(long purgeTimeStamp) {
|
|
|
- this.purgeTimeStamp = purgeTimeStamp;
|
|
|
- }
|
|
|
|
|
|
- public boolean accept(File file) {
|
|
|
- LOG.debug("PurgeFilter - file: " + file + ", mtime: " + file.lastModified() + ", purge: " + purgeTimeStamp);
|
|
|
- return file.lastModified() < purgeTimeStamp;
|
|
|
- }
|
|
|
+ private static class TaskLogsPurgeFilter implements FileFilter {
|
|
|
+ long purgeTimeStamp;
|
|
|
+
|
|
|
+ TaskLogsPurgeFilter(long purgeTimeStamp) {
|
|
|
+ this.purgeTimeStamp = purgeTimeStamp;
|
|
|
}
|
|
|
|
|
|
- private File getLogSplit(int split) {
|
|
|
- String splitName = "part-" + String.format("%1$06d", split);
|
|
|
- return new File(taskLogDir, splitName);
|
|
|
+ public boolean accept(File file) {
|
|
|
+ LOG.debug("PurgeFilter - file: " + file + ", mtime: " + file.lastModified() + ", purge: " + purgeTimeStamp);
|
|
|
+ return file.lastModified() < purgeTimeStamp;
|
|
|
}
|
|
|
-
|
|
|
- private void deleteDir(File dir) throws IOException {
|
|
|
- File[] files = dir.listFiles();
|
|
|
- if (files != null) {
|
|
|
- for (int i=0; i < files.length; ++i) {
|
|
|
- if (files[i].isDirectory()) {
|
|
|
- deleteDir(files[i]);
|
|
|
- }
|
|
|
- files[i].delete();
|
|
|
- }
|
|
|
+ }
|
|
|
+ /**
|
|
|
+ * Purge old user logs.
|
|
|
+ *
|
|
|
+ * @throws IOException
|
|
|
+ */
|
|
|
+ public static synchronized void cleanup(int logsRetainHours
|
|
|
+ ) throws IOException {
|
|
|
+ // Purge logs of tasks on this tasktracker if their
|
|
|
+ // mtime has exceeded "mapred.task.log.retain" hours
|
|
|
+ long purgeTimeStamp = System.currentTimeMillis() -
|
|
|
+ (logsRetainHours*60L*60*1000);
|
|
|
+ File[] oldTaskLogs = LOG_DIR.listFiles
|
|
|
+ (new TaskLogsPurgeFilter(purgeTimeStamp));
|
|
|
+ if (oldTaskLogs != null) {
|
|
|
+ for (int i=0; i < oldTaskLogs.length; ++i) {
|
|
|
+ FileUtil.fullyDelete(oldTaskLogs[i]);
|
|
|
}
|
|
|
- boolean del = dir.delete();
|
|
|
- LOG.debug("Deleted " + dir + ": " + del);
|
|
|
}
|
|
|
-
|
|
|
+ }
|
|
|
+
|
|
|
+ public static class Reader extends InputStream {
|
|
|
+ private long bytesRemaining;
|
|
|
+ private FileInputStream file;
|
|
|
/**
|
|
|
- * Initialize the task log-writer.
|
|
|
- *
|
|
|
+ * Read a log file from start to end positions. The offsets may be negative,
|
|
|
+ * in which case they are relative to the end of the file. For example,
|
|
|
+ * Reader(taskid, kind, 0, -1) is the entire file and
|
|
|
+ * Reader(taskid, kind, -4197, -1) is the last 4196 bytes.
|
|
|
+ * @param taskid the id of the task to read the log file for
|
|
|
+ * @param kind the kind of log to read
|
|
|
+ * @param start the offset to read from (negative is relative to tail)
|
|
|
+ * @param end the offset to read upto (negative is relative to tail)
|
|
|
* @throws IOException
|
|
|
*/
|
|
|
- public synchronized void init() throws IOException {
|
|
|
- if (!initialized) {
|
|
|
- // Purge logs of tasks on this tasktracker if their
|
|
|
- // mtime has exceeded "mapred.task.log.retain" hours
|
|
|
- long purgeTimeStamp = System.currentTimeMillis() -
|
|
|
- (logsRetainHours*60*60*1000);
|
|
|
- File[] oldTaskLogs = LOG_DIR.listFiles(
|
|
|
- new TaskLogsPurgeFilter(purgeTimeStamp)
|
|
|
- );
|
|
|
- if (oldTaskLogs != null) {
|
|
|
- for (int i=0; i < oldTaskLogs.length; ++i) {
|
|
|
- deleteDir(oldTaskLogs[i]);
|
|
|
- }
|
|
|
+ public Reader(String taskid, LogName kind,
|
|
|
+ long start, long end) throws IOException {
|
|
|
+ // find the right log file
|
|
|
+ File filename = getTaskLogFile(taskid, kind);
|
|
|
+ // calculate the start and stop
|
|
|
+ long size = filename.length();
|
|
|
+ if (start < 0) {
|
|
|
+ start += size + 1;
|
|
|
+ }
|
|
|
+ if (end < 0) {
|
|
|
+ end += size + 1;
|
|
|
+ }
|
|
|
+ start = Math.max(0, Math.min(start, size));
|
|
|
+ end = Math.max(0, Math.min(end, size));
|
|
|
+ bytesRemaining = end - start;
|
|
|
+ file = new FileInputStream(filename);
|
|
|
+ // skip upto start
|
|
|
+ long pos = 0;
|
|
|
+ while (pos < start) {
|
|
|
+ long result = file.skip(start - pos);
|
|
|
+ if (result < 0) {
|
|
|
+ bytesRemaining = 0;
|
|
|
+ break;
|
|
|
}
|
|
|
-
|
|
|
- // Initialize the task's log directory
|
|
|
- if (taskLogDir.exists()) {
|
|
|
- deleteDir(taskLogDir);
|
|
|
- }
|
|
|
- taskLogDir.mkdirs();
|
|
|
-
|
|
|
- // Create the split index
|
|
|
- splitIndex = new BufferedOutputStream(
|
|
|
- new FileOutputStream(new File(taskLogDir, SPLIT_INDEX_NAME))
|
|
|
- );
|
|
|
-
|
|
|
- out = createLogSplit(noSplits);
|
|
|
- initialized = true;
|
|
|
+ pos += result;
|
|
|
}
|
|
|
}
|
|
|
|
|
|
- /**
|
|
|
- * Write a log message to the task log.
|
|
|
- *
|
|
|
- * @param b bytes to be writter
|
|
|
- * @param off start offset
|
|
|
- * @param len length of data
|
|
|
- * @throws IOException
|
|
|
- */
|
|
|
- public synchronized void write(byte[] b, int off, int len)
|
|
|
- throws IOException {
|
|
|
- // Check if we need to rotate the log
|
|
|
- if (splitLength > splitFileSize) {
|
|
|
- LOG.debug("Total no. of bytes written to split#" + noSplits +
|
|
|
- " -> " + splitLength);
|
|
|
- logRotate();
|
|
|
+ public int read() throws IOException {
|
|
|
+ int result = -1;
|
|
|
+ if (bytesRemaining > 0) {
|
|
|
+ bytesRemaining -= 1;
|
|
|
+ result = file.read();
|
|
|
}
|
|
|
-
|
|
|
- // Periodically flush data to disk
|
|
|
- if (flushCtr > FLUSH_BYTES) {
|
|
|
- out.flush();
|
|
|
- flushCtr = 0;
|
|
|
- }
|
|
|
-
|
|
|
- // Write out to the log-split
|
|
|
- out.write(b, off, len);
|
|
|
- splitLength += len;
|
|
|
- flushCtr += len;
|
|
|
+ return result;
|
|
|
}
|
|
|
|
|
|
- /**
|
|
|
- * Close the task log.
|
|
|
- *
|
|
|
- * @throws IOException
|
|
|
- */
|
|
|
- public synchronized void close() throws IOException {
|
|
|
- // Close the final split
|
|
|
- if (out != null) {
|
|
|
- out.close();
|
|
|
- }
|
|
|
-
|
|
|
- // Close the split-index
|
|
|
- if (splitIndex != null) {
|
|
|
- writeIndexRecord();
|
|
|
- splitIndex.close();
|
|
|
+ public int read(byte[] buffer, int offset, int length) throws IOException {
|
|
|
+ length = (int) Math.min(length, bytesRemaining);
|
|
|
+ int bytes = file.read(buffer, offset, length);
|
|
|
+ if (bytes > 0) {
|
|
|
+ bytesRemaining -= bytes;
|
|
|
}
|
|
|
- }
|
|
|
-
|
|
|
- private synchronized OutputStream createLogSplit(int split)
|
|
|
- throws IOException {
|
|
|
- currentSplit = getLogSplit(split);
|
|
|
- LOG.debug("About to create the split: " + currentSplit);
|
|
|
- // Record the 'split' in the index
|
|
|
- writeIndexRecord();
|
|
|
- return new BufferedOutputStream(new FileOutputStream(currentSplit));
|
|
|
+ return bytes;
|
|
|
}
|
|
|
|
|
|
- private synchronized void writeIndexRecord() throws IOException {
|
|
|
- String indexRecord = currentSplit + "|" + splitOffset + "\n";
|
|
|
- splitIndex.write(indexRecord.getBytes());
|
|
|
- splitIndex.flush();
|
|
|
+ public int available() throws IOException {
|
|
|
+ return (int) Math.min(bytesRemaining, file.available());
|
|
|
}
|
|
|
-
|
|
|
- private synchronized void logRotate() throws IOException {
|
|
|
- // Close the current split
|
|
|
- LOG.debug("About to rotate-out the split: " + noSplits);
|
|
|
- out.close();
|
|
|
-
|
|
|
- // Re-initialize the state
|
|
|
- splitOffset += splitLength;
|
|
|
- splitLength = 0;
|
|
|
- flushCtr = 0;
|
|
|
|
|
|
- // New 'split'
|
|
|
- ++noSplits;
|
|
|
-
|
|
|
- // Check if we need to purge an old split
|
|
|
- if (purgeLogSplits) {
|
|
|
- if (noSplits >= noKeepSplits) { // noSplits is zero-based
|
|
|
- File purgeLogSplit = getLogSplit((noSplits-noKeepSplits));
|
|
|
- purgeLogSplit.delete();
|
|
|
- LOG.debug("Purged log-split #" + (noSplits-noKeepSplits) + " - " +
|
|
|
- purgeLogSplit);
|
|
|
- }
|
|
|
- }
|
|
|
-
|
|
|
- // Rotate the log
|
|
|
- out = createLogSplit(noSplits);
|
|
|
+ public void close() throws IOException {
|
|
|
+ file.close();
|
|
|
}
|
|
|
-
|
|
|
- } // TaskLog.Writer
|
|
|
+ }
|
|
|
|
|
|
+ private static final String bashCommand = "bash";
|
|
|
+ private static final String tailCommand = "tail";
|
|
|
+
|
|
|
/**
|
|
|
- * The log-reader for reading the 'split' user-logs.
|
|
|
- *
|
|
|
+ * Get the desired maximum length of task's logs.
|
|
|
+ * @param conf the job to look in
|
|
|
+ * @return the number of bytes to cap the log files at
|
|
|
*/
|
|
|
- static class Reader {
|
|
|
- private String taskId;
|
|
|
- private LogFilter filter;
|
|
|
-
|
|
|
- private File taskLogDir;
|
|
|
- private boolean initialized = false;
|
|
|
-
|
|
|
- private IndexRecord[] indexRecords = null;
|
|
|
- private BufferedReader splitIndex;
|
|
|
-
|
|
|
- private long logFileSize = 0;
|
|
|
-
|
|
|
- /**
|
|
|
- * Create a new task log reader.
|
|
|
- *
|
|
|
- * @param taskId task id of the task.
|
|
|
- * @param filter the {@link LogFilter} to apply on userlogs.
|
|
|
- */
|
|
|
- public Reader(String taskId, LogFilter filter) {
|
|
|
- this.taskId = taskId;
|
|
|
- this.filter = filter;
|
|
|
-
|
|
|
- this.taskLogDir = getTaskLogDir(this.taskId, this.filter);
|
|
|
- }
|
|
|
-
|
|
|
- private static class IndexRecord {
|
|
|
- String splitName;
|
|
|
- long splitOffset;
|
|
|
-
|
|
|
- IndexRecord(String splitName, long splitOffset) {
|
|
|
- this.splitName = splitName;
|
|
|
- this.splitOffset = splitOffset;
|
|
|
- }
|
|
|
- }
|
|
|
-
|
|
|
- private synchronized void init() throws IOException {
|
|
|
- this.splitIndex = new BufferedReader(new InputStreamReader(
|
|
|
- new FileInputStream(new File(taskLogDir,
|
|
|
- SPLIT_INDEX_NAME))));
|
|
|
-
|
|
|
- // Parse the split-index and store the offsets/lengths
|
|
|
- ArrayList<IndexRecord> records = new ArrayList<IndexRecord>();
|
|
|
- String line;
|
|
|
- while ((line = splitIndex.readLine()) != null) {
|
|
|
- String[] fields = line.split("\\|");
|
|
|
- if (fields.length != 2) {
|
|
|
- throw new IOException("Malformed split-index with " +
|
|
|
- fields.length + " fields");
|
|
|
- }
|
|
|
-
|
|
|
- IndexRecord record = new IndexRecord(
|
|
|
- fields[0],
|
|
|
- Long.valueOf(fields[1]).longValue()
|
|
|
- );
|
|
|
- LOG.debug("Split: <" + record.splitName + ", " + record.splitOffset + ">");
|
|
|
-
|
|
|
- // Save
|
|
|
- records.add(record);
|
|
|
- }
|
|
|
-
|
|
|
- indexRecords = new IndexRecord[records.size()];
|
|
|
- indexRecords = records.toArray(indexRecords);
|
|
|
- IndexRecord lastRecord = indexRecords[records.size() - 1];
|
|
|
- logFileSize = lastRecord.splitOffset
|
|
|
- + new File(lastRecord.splitName).length();
|
|
|
- initialized = true;
|
|
|
- LOG.debug("Log size: " + logFileSize);
|
|
|
- }
|
|
|
-
|
|
|
- /**
|
|
|
- * Return the total 'logical' log-size written by the task, including
|
|
|
- * purged data.
|
|
|
- *
|
|
|
- * @return the total 'logical' log-size written by the task, including
|
|
|
- * purged data.
|
|
|
- * @throws IOException
|
|
|
- */
|
|
|
- public synchronized long getTotalLogSize() throws IOException {
|
|
|
- if (!initialized) {
|
|
|
- init();
|
|
|
- }
|
|
|
-
|
|
|
- return logFileSize;
|
|
|
- }
|
|
|
-
|
|
|
- /**
|
|
|
- * Return the entire user-log (remaining splits).
|
|
|
- *
|
|
|
- * @return Returns a <code>byte[]</code> containing the data in user-log.
|
|
|
- * @throws IOException
|
|
|
- */
|
|
|
- public synchronized byte[] fetchAll() throws IOException {
|
|
|
- if (!initialized) {
|
|
|
- init();
|
|
|
- }
|
|
|
-
|
|
|
- // Get all splits
|
|
|
- Vector<InputStream> streams = new Vector<InputStream>();
|
|
|
- for (int i=0; i < indexRecords.length; ++i) {
|
|
|
- InputStream stream = getLogSplit(i);
|
|
|
- if (stream != null) {
|
|
|
- streams.add(stream);
|
|
|
- LOG.debug("Added split: " + i);
|
|
|
- }
|
|
|
- }
|
|
|
- LOG.debug("Total log-size on disk: " + logFileSize);
|
|
|
-
|
|
|
- // Copy log data into buffer
|
|
|
- byte[] b = new byte[(int) logFileSize];
|
|
|
- SequenceInputStream in = new SequenceInputStream(streams.elements());
|
|
|
- try {
|
|
|
- int bytesRead = 0;
|
|
|
- int off = 0;
|
|
|
- LOG.debug("Attempting to read " + logFileSize + " bytes from logs");
|
|
|
- while ((bytesRead = in.read(b, off, (int) logFileSize - off)) > 0) {
|
|
|
- LOG.debug("Got " + bytesRead + " bytes");
|
|
|
- off += bytesRead;
|
|
|
- }
|
|
|
-
|
|
|
- if (off != logFileSize) {
|
|
|
- LOG.debug("Didn't not read all requisite data in logs!");
|
|
|
- }
|
|
|
- } finally {
|
|
|
- try { in.close(); } catch (IOException ex) {}
|
|
|
- }
|
|
|
- return b;
|
|
|
- }
|
|
|
-
|
|
|
- /**
|
|
|
- * Tail the user-log.
|
|
|
- *
|
|
|
- * @param b the buffer into which the data is read.
|
|
|
- * @param off the start offset in array <code>b</code>
|
|
|
- * at which the data is written.
|
|
|
- * @param len the maximum number of bytes to read.
|
|
|
- * @param tailSize the no. of bytes to be read from end of file.
|
|
|
- * @param tailWindow the sliding window for tailing the logs.
|
|
|
- * @return the total number of bytes of user-logs dataread into the buffer.
|
|
|
- * @throws IOException
|
|
|
- */
|
|
|
- public synchronized int tail(byte[] b, int off, int len,
|
|
|
- long tailSize, int tailWindow)
|
|
|
- throws IOException {
|
|
|
- if (!initialized) {
|
|
|
- init();
|
|
|
- }
|
|
|
-
|
|
|
- LOG.debug("tailSize: " + tailSize + " - tailWindow: " + tailWindow);
|
|
|
-
|
|
|
- if (tailSize*tailWindow > logFileSize) {
|
|
|
- tailSize = logFileSize;
|
|
|
- tailWindow = 1;
|
|
|
- }
|
|
|
-
|
|
|
- return read(b, off, len,
|
|
|
- (long)(logFileSize-(tailSize*tailWindow)), tailSize);
|
|
|
- }
|
|
|
-
|
|
|
- /**
|
|
|
- * Read user-log data given an offset/length.
|
|
|
- *
|
|
|
- * @param b the buffer into which the data is read.
|
|
|
- * @param off the start offset in array <code>b</code>
|
|
|
- * at which the data is written.
|
|
|
- * @param len the maximum number of bytes to read.
|
|
|
- * @param logOffset the offset of the user-log from which to get data.
|
|
|
- * @param logLength the maximum number of bytes of user-log data to fetch.
|
|
|
- * @return the total number of bytes of user-logs dataread into the buffer.
|
|
|
- * @throws IOException
|
|
|
- */
|
|
|
- public synchronized int read(byte[] b, int off, int len,
|
|
|
- long logOffset, long logLength)
|
|
|
- throws IOException {
|
|
|
- LOG.debug("TaskLog.Reader.read: logOffset: " + logOffset + " - logLength: " + logLength);
|
|
|
-
|
|
|
- // Sanity check
|
|
|
- if (logLength == 0) {
|
|
|
- return 0;
|
|
|
- }
|
|
|
-
|
|
|
- if (!initialized) {
|
|
|
- init();
|
|
|
- }
|
|
|
-
|
|
|
- // Locate the requisite splits
|
|
|
- Vector<InputStream> streams = new Vector<InputStream>();
|
|
|
- long offset = logOffset;
|
|
|
- int startIndex = -1, stopIndex = -1;
|
|
|
- boolean inRange = false;
|
|
|
- for (int i=0; i < indexRecords.length; ++i) {
|
|
|
- LOG.debug("offset: " + offset + " - (split, splitOffset) : (" +
|
|
|
- i + ", " + indexRecords[i].splitOffset + ")");
|
|
|
-
|
|
|
- if (offset <= indexRecords[i].splitOffset) {
|
|
|
- if (!inRange) {
|
|
|
- startIndex = i - ((i > 0) ? 1 : 0);
|
|
|
- LOG.debug("Starting at split: " + startIndex);
|
|
|
- offset += logLength;
|
|
|
- InputStream stream = getLogSplit(startIndex);
|
|
|
- if (stream != null) {
|
|
|
- streams.add(stream);
|
|
|
- }
|
|
|
- LOG.debug("Added split: " + startIndex);
|
|
|
- inRange = true;
|
|
|
- } else {
|
|
|
- stopIndex = i-1;
|
|
|
- LOG.debug("Stop at split: " + stopIndex);
|
|
|
- break;
|
|
|
- }
|
|
|
- }
|
|
|
-
|
|
|
- if (inRange) {
|
|
|
- InputStream stream = getLogSplit(i);
|
|
|
- if (stream != null) {
|
|
|
- streams.add(stream);
|
|
|
- }
|
|
|
- LOG.debug("Added split: " + i);
|
|
|
- }
|
|
|
- }
|
|
|
- if (startIndex == -1) {
|
|
|
- throw new IOException("Illegal logOffset/logLength");
|
|
|
- }
|
|
|
- if (stopIndex == -1) {
|
|
|
- stopIndex = indexRecords.length - 1;
|
|
|
- LOG.debug("Stop at split: " + stopIndex);
|
|
|
-
|
|
|
- // Check if request exceeds the log-file size
|
|
|
- if ((logOffset+logLength) > logFileSize) {
|
|
|
- LOG.debug("logOffset+logLength exceeds log-file size");
|
|
|
- logLength = logFileSize - logOffset;
|
|
|
- }
|
|
|
- }
|
|
|
-
|
|
|
- // Copy requisite data into user buffer
|
|
|
- SequenceInputStream in = new SequenceInputStream(streams.elements());
|
|
|
- int totalBytesRead = 0;
|
|
|
- try {
|
|
|
- if (streams.size() == (stopIndex - startIndex +1)) {
|
|
|
- // Skip to get to 'logOffset' if logs haven't been purged
|
|
|
- long skipBytes =
|
|
|
- in.skip(logOffset - indexRecords[startIndex].splitOffset);
|
|
|
- LOG.debug("Skipped " + skipBytes + " bytes from " +
|
|
|
- startIndex + " stream");
|
|
|
- }
|
|
|
- int bytesRead = 0;
|
|
|
- len = Math.min((int)logLength, len);
|
|
|
- LOG.debug("Attempting to read " + len + " bytes from logs");
|
|
|
- while ((bytesRead = in.read(b, off, len)) > 0) {
|
|
|
- off += bytesRead;
|
|
|
- len -= bytesRead;
|
|
|
-
|
|
|
- totalBytesRead += bytesRead;
|
|
|
- }
|
|
|
- } finally {
|
|
|
- try { in.close(); } catch (IOException e) {}
|
|
|
- }
|
|
|
-
|
|
|
- return totalBytesRead;
|
|
|
- }
|
|
|
-
|
|
|
- private synchronized InputStream getLogSplit(int split)
|
|
|
- throws IOException {
|
|
|
- String splitName = indexRecords[split].splitName;
|
|
|
- LOG.debug("About to open the split: " + splitName);
|
|
|
- InputStream in = null;
|
|
|
- try {
|
|
|
- in = new BufferedInputStream(new FileInputStream(new File(splitName)));
|
|
|
- } catch (FileNotFoundException fnfe) {
|
|
|
- in = null;
|
|
|
- LOG.debug("Split " + splitName + " not found... probably purged!");
|
|
|
- }
|
|
|
-
|
|
|
- return in;
|
|
|
- }
|
|
|
-
|
|
|
- } // TaskLog.Reader
|
|
|
+ public static long getTaskLogLength(JobConf conf) {
|
|
|
+ return conf.getLong("mapred.userlog.limit.kb", 100) * 1024;
|
|
|
+ }
|
|
|
|
|
|
+ /**
|
|
|
+ * Wrap a command in a shell to capture stdout and stderr to files.
|
|
|
+ * If the tailLength is 0, the entire output will be saved.
|
|
|
+ * @param cmd The command and the arguments that should be run
|
|
|
+ * @param stdoutFilename The filename that stdout should be saved to
|
|
|
+ * @param stderrFilename The filename that stderr should be saved to
|
|
|
+ * @param tailLength The length of the tail to be saved.
|
|
|
+ * @return the modified command that should be run
|
|
|
+ */
|
|
|
+ public static List<String> captureOutAndError(List<String> cmd,
|
|
|
+ File stdoutFilename,
|
|
|
+ File stderrFilename,
|
|
|
+ long tailLength
|
|
|
+ ) throws IOException {
|
|
|
+ String stdout = FileUtil.makeShellPath(stdoutFilename);
|
|
|
+ String stderr = FileUtil.makeShellPath(stderrFilename);
|
|
|
+ List<String> result = new ArrayList<String>(3);
|
|
|
+ result.add(bashCommand);
|
|
|
+ result.add("-c");
|
|
|
+ StringBuffer mergedCmd = new StringBuffer();
|
|
|
+ if (tailLength > 0) {
|
|
|
+ mergedCmd.append("(");
|
|
|
+ } else {
|
|
|
+ mergedCmd.append("exec ");
|
|
|
+ }
|
|
|
+ boolean isExecutable = true;
|
|
|
+ for(String s: cmd) {
|
|
|
+ mergedCmd.append('\'');
|
|
|
+ if (isExecutable) {
|
|
|
+ // the executable name needs to be expressed as a shell path for the
|
|
|
+ // shell to find it.
|
|
|
+ mergedCmd.append(FileUtil.makeShellPath(new File(s)));
|
|
|
+ isExecutable = false;
|
|
|
+ } else {
|
|
|
+ mergedCmd.append(s);
|
|
|
+ }
|
|
|
+ mergedCmd.append('\'');
|
|
|
+ mergedCmd.append(" ");
|
|
|
+ }
|
|
|
+ mergedCmd.append(" < /dev/null ");
|
|
|
+ if (tailLength > 0) {
|
|
|
+ mergedCmd.append(" | ");
|
|
|
+ mergedCmd.append(tailCommand);
|
|
|
+ mergedCmd.append(" -c ");
|
|
|
+ mergedCmd.append(tailLength);
|
|
|
+ mergedCmd.append(" >> ");
|
|
|
+ mergedCmd.append(stdout);
|
|
|
+ mergedCmd.append(" ; exit $PIPESTATUS ) 2>&1 | ");
|
|
|
+ mergedCmd.append(tailCommand);
|
|
|
+ mergedCmd.append(" -c ");
|
|
|
+ mergedCmd.append(tailLength);
|
|
|
+ mergedCmd.append(" >> ");
|
|
|
+ mergedCmd.append(stderr);
|
|
|
+ mergedCmd.append(" ; exit $PIPESTATUS");
|
|
|
+ } else {
|
|
|
+ mergedCmd.append(" 1>> ");
|
|
|
+ mergedCmd.append(stdout);
|
|
|
+ mergedCmd.append(" 2>> ");
|
|
|
+ mergedCmd.append(stderr);
|
|
|
+ }
|
|
|
+ result.add(mergedCmd.toString());
|
|
|
+ return result;
|
|
|
+ }
|
|
|
} // TaskLog
|