|
@@ -40,7 +40,6 @@ import javax.ws.rs.core.Response;
|
|
|
import javax.ws.rs.core.StreamingOutput;
|
|
|
import javax.ws.rs.core.Response.ResponseBuilder;
|
|
|
import javax.ws.rs.core.Response.Status;
|
|
|
-
|
|
|
import org.apache.hadoop.classification.InterfaceAudience.Public;
|
|
|
import org.apache.hadoop.classification.InterfaceStability.Unstable;
|
|
|
import org.apache.hadoop.conf.Configuration;
|
|
@@ -363,86 +362,94 @@ public class AHSWebServices extends WebServices {
|
|
|
if ((nodeId == null || nodeName.contains(LogAggregationUtils
|
|
|
.getNodeString(nodeId))) && !nodeName.endsWith(
|
|
|
LogAggregationUtils.TMP_FILE_SUFFIX)) {
|
|
|
- AggregatedLogFormat.LogReader reader =
|
|
|
- new AggregatedLogFormat.LogReader(conf,
|
|
|
- thisNodeFile.getPath());
|
|
|
- DataInputStream valueStream;
|
|
|
- LogKey key = new LogKey();
|
|
|
- valueStream = reader.next(key);
|
|
|
- while (valueStream != null && !key.toString()
|
|
|
- .equals(containerIdStr)) {
|
|
|
- // Next container
|
|
|
- key = new LogKey();
|
|
|
+ AggregatedLogFormat.LogReader reader = null;
|
|
|
+ try {
|
|
|
+ reader = new AggregatedLogFormat.LogReader(conf,
|
|
|
+ thisNodeFile.getPath());
|
|
|
+ DataInputStream valueStream;
|
|
|
+ LogKey key = new LogKey();
|
|
|
valueStream = reader.next(key);
|
|
|
- }
|
|
|
- if (valueStream == null) {
|
|
|
- continue;
|
|
|
- }
|
|
|
- while (true) {
|
|
|
- try {
|
|
|
- String fileType = valueStream.readUTF();
|
|
|
- String fileLengthStr = valueStream.readUTF();
|
|
|
- long fileLength = Long.parseLong(fileLengthStr);
|
|
|
- if (fileType.equalsIgnoreCase(logFile)) {
|
|
|
- StringBuilder sb = new StringBuilder();
|
|
|
- sb.append("LogType:");
|
|
|
- sb.append(fileType + "\n");
|
|
|
- sb.append("Log Upload Time:");
|
|
|
- sb.append(Times.format(System.currentTimeMillis()) + "\n");
|
|
|
- sb.append("LogLength:");
|
|
|
- sb.append(fileLengthStr + "\n");
|
|
|
- sb.append("Log Contents:\n");
|
|
|
- byte[] b = sb.toString().getBytes(Charset.forName("UTF-8"));
|
|
|
- os.write(b, 0, b.length);
|
|
|
-
|
|
|
- long toSkip = 0;
|
|
|
- long totalBytesToRead = fileLength;
|
|
|
- if (bytes < 0) {
|
|
|
- long absBytes = Math.abs(bytes);
|
|
|
- if (absBytes < fileLength) {
|
|
|
- toSkip = fileLength - absBytes;
|
|
|
- totalBytesToRead = absBytes;
|
|
|
+ while (valueStream != null && !key.toString()
|
|
|
+ .equals(containerIdStr)) {
|
|
|
+ // Next container
|
|
|
+ key = new LogKey();
|
|
|
+ valueStream = reader.next(key);
|
|
|
+ }
|
|
|
+ if (valueStream == null) {
|
|
|
+ continue;
|
|
|
+ }
|
|
|
+ while (true) {
|
|
|
+ try {
|
|
|
+ String fileType = valueStream.readUTF();
|
|
|
+ String fileLengthStr = valueStream.readUTF();
|
|
|
+ long fileLength = Long.parseLong(fileLengthStr);
|
|
|
+ if (fileType.equalsIgnoreCase(logFile)) {
|
|
|
+ StringBuilder sb = new StringBuilder();
|
|
|
+ sb.append("LogType:");
|
|
|
+ sb.append(fileType + "\n");
|
|
|
+ sb.append("Log Upload Time:");
|
|
|
+ sb.append(Times.format(System.currentTimeMillis()) + "\n");
|
|
|
+ sb.append("LogLength:");
|
|
|
+ sb.append(fileLengthStr + "\n");
|
|
|
+ sb.append("Log Contents:\n");
|
|
|
+ byte[] b = sb.toString().getBytes(
|
|
|
+ Charset.forName("UTF-8"));
|
|
|
+ os.write(b, 0, b.length);
|
|
|
+
|
|
|
+ long toSkip = 0;
|
|
|
+ long totalBytesToRead = fileLength;
|
|
|
+ if (bytes < 0) {
|
|
|
+ long absBytes = Math.abs(bytes);
|
|
|
+ if (absBytes < fileLength) {
|
|
|
+ toSkip = fileLength - absBytes;
|
|
|
+ totalBytesToRead = absBytes;
|
|
|
+ }
|
|
|
+ long skippedBytes = valueStream.skip(toSkip);
|
|
|
+ if (skippedBytes != toSkip) {
|
|
|
+ throw new IOException("The bytes were skipped are "
|
|
|
+ + "different from the caller requested");
|
|
|
+ }
|
|
|
+ } else {
|
|
|
+ if (bytes < fileLength) {
|
|
|
+ totalBytesToRead = bytes;
|
|
|
+ }
|
|
|
}
|
|
|
- long skippedBytes = valueStream.skip(toSkip);
|
|
|
- if (skippedBytes != toSkip) {
|
|
|
- throw new IOException("The bytes were skipped are "
|
|
|
- + "different from the caller requested");
|
|
|
+
|
|
|
+ long curRead = 0;
|
|
|
+ long pendingRead = totalBytesToRead - curRead;
|
|
|
+ int toRead = pendingRead > buf.length ? buf.length
|
|
|
+ : (int) pendingRead;
|
|
|
+ int len = valueStream.read(buf, 0, toRead);
|
|
|
+ while (len != -1 && curRead < totalBytesToRead) {
|
|
|
+ os.write(buf, 0, len);
|
|
|
+ curRead += len;
|
|
|
+
|
|
|
+ pendingRead = totalBytesToRead - curRead;
|
|
|
+ toRead = pendingRead > buf.length ? buf.length
|
|
|
+ : (int) pendingRead;
|
|
|
+ len = valueStream.read(buf, 0, toRead);
|
|
|
}
|
|
|
+ sb = new StringBuilder();
|
|
|
+ sb.append("\nEnd of LogType:" + fileType + "\n");
|
|
|
+ b = sb.toString().getBytes(Charset.forName("UTF-8"));
|
|
|
+ os.write(b, 0, b.length);
|
|
|
+ findLogs = true;
|
|
|
} else {
|
|
|
- if (bytes < fileLength) {
|
|
|
- totalBytesToRead = bytes;
|
|
|
+ long totalSkipped = 0;
|
|
|
+ long currSkipped = 0;
|
|
|
+ while (currSkipped != -1 && totalSkipped < fileLength) {
|
|
|
+ currSkipped = valueStream.skip(
|
|
|
+ fileLength - totalSkipped);
|
|
|
+ totalSkipped += currSkipped;
|
|
|
}
|
|
|
}
|
|
|
-
|
|
|
- long curRead = 0;
|
|
|
- long pendingRead = totalBytesToRead - curRead;
|
|
|
- int toRead = pendingRead > buf.length ? buf.length
|
|
|
- : (int) pendingRead;
|
|
|
- int len = valueStream.read(buf, 0, toRead);
|
|
|
- while (len != -1 && curRead < totalBytesToRead) {
|
|
|
- os.write(buf, 0, len);
|
|
|
- curRead += len;
|
|
|
-
|
|
|
- pendingRead = totalBytesToRead - curRead;
|
|
|
- toRead = pendingRead > buf.length ? buf.length
|
|
|
- : (int) pendingRead;
|
|
|
- len = valueStream.read(buf, 0, toRead);
|
|
|
- }
|
|
|
- sb = new StringBuilder();
|
|
|
- sb.append("\nEnd of LogType:" + fileType + "\n");
|
|
|
- b = sb.toString().getBytes(Charset.forName("UTF-8"));
|
|
|
- os.write(b, 0, b.length);
|
|
|
- findLogs = true;
|
|
|
- } else {
|
|
|
- long totalSkipped = 0;
|
|
|
- long currSkipped = 0;
|
|
|
- while (currSkipped != -1 && totalSkipped < fileLength) {
|
|
|
- currSkipped = valueStream.skip(fileLength - totalSkipped);
|
|
|
- totalSkipped += currSkipped;
|
|
|
- }
|
|
|
+ } catch (EOFException eof) {
|
|
|
+ break;
|
|
|
}
|
|
|
- } catch (EOFException eof) {
|
|
|
- break;
|
|
|
+ }
|
|
|
+ } finally {
|
|
|
+ if (reader != null) {
|
|
|
+ reader.close();
|
|
|
}
|
|
|
}
|
|
|
}
|