|
@@ -27,6 +27,7 @@ import java.io.File;
|
|
|
import java.io.FileInputStream;
|
|
|
import java.io.IOException;
|
|
|
import java.io.InputStreamReader;
|
|
|
+import java.io.OutputStream;
|
|
|
import java.io.PrintStream;
|
|
|
import java.io.Writer;
|
|
|
import java.security.PrivilegedExceptionAction;
|
|
@@ -44,6 +45,7 @@ import java.util.Set;
|
|
|
import java.util.regex.Pattern;
|
|
|
|
|
|
import org.apache.commons.io.input.BoundedInputStream;
|
|
|
+import org.apache.commons.io.output.WriterOutputStream;
|
|
|
import org.apache.commons.logging.Log;
|
|
|
import org.apache.commons.logging.LogFactory;
|
|
|
import org.apache.hadoop.classification.InterfaceAudience.Private;
|
|
@@ -233,9 +235,6 @@ public class AggregatedLogFormat {
|
|
|
// Write the logFile Type
|
|
|
out.writeUTF(logFile.getName());
|
|
|
|
|
|
- // Write the uploaded TimeStamp
|
|
|
- out.writeLong(System.currentTimeMillis());
|
|
|
-
|
|
|
// Write the log length as UTF so that it is printable
|
|
|
out.writeUTF(String.valueOf(fileLength));
|
|
|
|
|
@@ -400,6 +399,11 @@ public class AggregatedLogFormat {
|
|
|
writeVersion();
|
|
|
}
|
|
|
|
|
|
+ @VisibleForTesting
|
|
|
+ public TFile.Writer getWriter() {
|
|
|
+ return this.writer;
|
|
|
+ }
|
|
|
+
|
|
|
private void writeVersion() throws IOException {
|
|
|
DataOutputStream out = this.writer.prepareAppendKey(-1);
|
|
|
VERSION_KEY.write(out);
|
|
@@ -639,70 +643,55 @@ public class AggregatedLogFormat {
|
|
|
* Writes all logs for a single container to the provided writer.
|
|
|
* @param valueStream
|
|
|
* @param writer
|
|
|
+ * @param logUploadedTime
|
|
|
* @throws IOException
|
|
|
*/
|
|
|
public static void readAcontainerLogs(DataInputStream valueStream,
|
|
|
- Writer writer) throws IOException {
|
|
|
- int bufferSize = 65536;
|
|
|
- char[] cbuf = new char[bufferSize];
|
|
|
- String fileType;
|
|
|
- long uploadTime;
|
|
|
- String fileLengthStr;
|
|
|
- long fileLength;
|
|
|
-
|
|
|
- while (true) {
|
|
|
- try {
|
|
|
- fileType = valueStream.readUTF();
|
|
|
- } catch (EOFException e) {
|
|
|
- // EndOfFile
|
|
|
- return;
|
|
|
- }
|
|
|
- uploadTime = valueStream.readLong();
|
|
|
- fileLengthStr = valueStream.readUTF();
|
|
|
- fileLength = Long.parseLong(fileLengthStr);
|
|
|
- writer.write("\n\nLogType:");
|
|
|
- writer.write(fileType);
|
|
|
- writer.write("\nLogUploadTime:");
|
|
|
- writer.write(String.valueOf(uploadTime));
|
|
|
- writer.write("\nLogLength:");
|
|
|
- writer.write(fileLengthStr);
|
|
|
- writer.write("\nLog Contents:\n");
|
|
|
- // ByteLevel
|
|
|
- BoundedInputStream bis =
|
|
|
- new BoundedInputStream(valueStream, fileLength);
|
|
|
- InputStreamReader reader = new InputStreamReader(bis);
|
|
|
- int currentRead = 0;
|
|
|
- int totalRead = 0;
|
|
|
- while ((currentRead = reader.read(cbuf, 0, bufferSize)) != -1) {
|
|
|
- writer.write(cbuf, 0, currentRead);
|
|
|
- totalRead += currentRead;
|
|
|
+ Writer writer, long logUploadedTime) throws IOException {
|
|
|
+ OutputStream os = null;
|
|
|
+ PrintStream ps = null;
|
|
|
+ try {
|
|
|
+ os = new WriterOutputStream(writer);
|
|
|
+ ps = new PrintStream(os);
|
|
|
+ while (true) {
|
|
|
+ try {
|
|
|
+ readContainerLogs(valueStream, ps, logUploadedTime);
|
|
|
+ } catch (EOFException e) {
|
|
|
+ // EndOfFile
|
|
|
+ return;
|
|
|
+ }
|
|
|
}
|
|
|
+ } finally {
|
|
|
+ IOUtils.cleanup(LOG, ps);
|
|
|
+ IOUtils.cleanup(LOG, os);
|
|
|
}
|
|
|
}
|
|
|
|
|
|
/**
|
|
|
- * Keep calling this till you get a {@link EOFException} for getting logs of
|
|
|
- * all types for a single container.
|
|
|
- *
|
|
|
+ * Writes all logs for a single container to the provided writer.
|
|
|
* @param valueStream
|
|
|
- * @param out
|
|
|
+ * @param writer
|
|
|
* @throws IOException
|
|
|
*/
|
|
|
- public static void readAContainerLogsForALogType(
|
|
|
- DataInputStream valueStream, PrintStream out)
|
|
|
- throws IOException {
|
|
|
+ public static void readAcontainerLogs(DataInputStream valueStream,
|
|
|
+ Writer writer) throws IOException {
|
|
|
+ readAcontainerLogs(valueStream, writer, -1);
|
|
|
+ }
|
|
|
|
|
|
+ private static void readContainerLogs(DataInputStream valueStream,
|
|
|
+ PrintStream out, long logUploadedTime) throws IOException {
|
|
|
byte[] buf = new byte[65535];
|
|
|
|
|
|
String fileType = valueStream.readUTF();
|
|
|
- long uploadTime = valueStream.readLong();
|
|
|
String fileLengthStr = valueStream.readUTF();
|
|
|
long fileLength = Long.parseLong(fileLengthStr);
|
|
|
- out.print("LogType: ");
|
|
|
+ out.print("LogType:");
|
|
|
out.println(fileType);
|
|
|
- out.print("LogUploadTime: ");
|
|
|
- out.println(Times.format(uploadTime));
|
|
|
- out.print("LogLength: ");
|
|
|
+ if (logUploadedTime != -1) {
|
|
|
+ out.print("Log Upload Time:");
|
|
|
+ out.println(Times.format(logUploadedTime));
|
|
|
+ }
|
|
|
+ out.print("LogLength:");
|
|
|
out.println(fileLengthStr);
|
|
|
out.println("Log Contents:");
|
|
|
|
|
@@ -723,6 +712,35 @@ public class AggregatedLogFormat {
|
|
|
out.println("");
|
|
|
}
|
|
|
|
|
|
+ /**
|
|
|
+ * Keep calling this till you get a {@link EOFException} for getting logs of
|
|
|
+ * all types for a single container.
|
|
|
+ *
|
|
|
+ * @param valueStream
|
|
|
+ * @param out
|
|
|
+ * @param logUploadedTime
|
|
|
+ * @throws IOException
|
|
|
+ */
|
|
|
+ public static void readAContainerLogsForALogType(
|
|
|
+ DataInputStream valueStream, PrintStream out, long logUploadedTime)
|
|
|
+ throws IOException {
|
|
|
+ readContainerLogs(valueStream, out, logUploadedTime);
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * Keep calling this till you get a {@link EOFException} for getting logs of
|
|
|
+ * all types for a single container.
|
|
|
+ *
|
|
|
+ * @param valueStream
|
|
|
+ * @param out
|
|
|
+ * @throws IOException
|
|
|
+ */
|
|
|
+ public static void readAContainerLogsForALogType(
|
|
|
+ DataInputStream valueStream, PrintStream out)
|
|
|
+ throws IOException {
|
|
|
+ readAContainerLogsForALogType(valueStream, out, -1);
|
|
|
+ }
|
|
|
+
|
|
|
public void close() {
|
|
|
IOUtils.cleanup(LOG, scanner, reader, fsDataIStream);
|
|
|
}
|
|
@@ -732,7 +750,6 @@ public class AggregatedLogFormat {
|
|
|
public static class ContainerLogsReader {
|
|
|
private DataInputStream valueStream;
|
|
|
private String currentLogType = null;
|
|
|
- private long currentLogUpLoadTime = 0;
|
|
|
private long currentLogLength = 0;
|
|
|
private BoundedInputStream currentLogData = null;
|
|
|
private InputStreamReader currentLogISR;
|
|
@@ -753,14 +770,12 @@ public class AggregatedLogFormat {
|
|
|
}
|
|
|
|
|
|
currentLogType = null;
|
|
|
- currentLogUpLoadTime = 0;
|
|
|
currentLogLength = 0;
|
|
|
currentLogData = null;
|
|
|
currentLogISR = null;
|
|
|
|
|
|
try {
|
|
|
String logType = valueStream.readUTF();
|
|
|
- long logUpLoadTime = valueStream.readLong();
|
|
|
String logLengthStr = valueStream.readUTF();
|
|
|
currentLogLength = Long.parseLong(logLengthStr);
|
|
|
currentLogData =
|
|
@@ -768,7 +783,6 @@ public class AggregatedLogFormat {
|
|
|
currentLogData.setPropagateClose(false);
|
|
|
currentLogISR = new InputStreamReader(currentLogData);
|
|
|
currentLogType = logType;
|
|
|
- currentLogUpLoadTime = logUpLoadTime;
|
|
|
} catch (EOFException e) {
|
|
|
}
|
|
|
|
|
@@ -779,10 +793,6 @@ public class AggregatedLogFormat {
|
|
|
return currentLogType;
|
|
|
}
|
|
|
|
|
|
- public long getCurrentLogUpLoadTime() {
|
|
|
- return currentLogUpLoadTime;
|
|
|
- }
|
|
|
-
|
|
|
public long getCurrentLogLength() {
|
|
|
return currentLogLength;
|
|
|
}
|