|
@@ -213,7 +213,8 @@ public class AHSWebServices extends WebServices {
|
|
@Context HttpServletResponse res,
|
|
@Context HttpServletResponse res,
|
|
@PathParam("containerid") String containerIdStr,
|
|
@PathParam("containerid") String containerIdStr,
|
|
@PathParam("filename") String filename,
|
|
@PathParam("filename") String filename,
|
|
- @QueryParam("download") String download) {
|
|
|
|
|
|
+ @QueryParam("download") String download,
|
|
|
|
+ @QueryParam("size") String size) {
|
|
init(res);
|
|
init(res);
|
|
ContainerId containerId;
|
|
ContainerId containerId;
|
|
try {
|
|
try {
|
|
@@ -225,6 +226,9 @@ public class AHSWebServices extends WebServices {
|
|
|
|
|
|
boolean downloadFile = parseBooleanParam(download);
|
|
boolean downloadFile = parseBooleanParam(download);
|
|
|
|
|
|
|
|
+
|
|
|
|
+ final long length = parseLongParam(size);
|
|
|
|
+
|
|
ApplicationId appId = containerId.getApplicationAttemptId()
|
|
ApplicationId appId = containerId.getApplicationAttemptId()
|
|
.getApplicationId();
|
|
.getApplicationId();
|
|
AppInfo appInfo;
|
|
AppInfo appInfo;
|
|
@@ -233,7 +237,7 @@ public class AHSWebServices extends WebServices {
|
|
} catch (Exception ex) {
|
|
} catch (Exception ex) {
|
|
// directly find logs from HDFS.
|
|
// directly find logs from HDFS.
|
|
return sendStreamOutputResponse(appId, null, null, containerIdStr,
|
|
return sendStreamOutputResponse(appId, null, null, containerIdStr,
|
|
- filename, downloadFile);
|
|
|
|
|
|
+ filename, downloadFile, length);
|
|
}
|
|
}
|
|
String appOwner = appInfo.getUser();
|
|
String appOwner = appInfo.getUser();
|
|
|
|
|
|
@@ -247,7 +251,7 @@ public class AHSWebServices extends WebServices {
|
|
if (isFinishedState(appInfo.getAppState())) {
|
|
if (isFinishedState(appInfo.getAppState())) {
|
|
// directly find logs from HDFS.
|
|
// directly find logs from HDFS.
|
|
return sendStreamOutputResponse(appId, appOwner, null, containerIdStr,
|
|
return sendStreamOutputResponse(appId, appOwner, null, containerIdStr,
|
|
- filename, downloadFile);
|
|
|
|
|
|
+ filename, downloadFile, length);
|
|
}
|
|
}
|
|
return createBadResponse(Status.INTERNAL_SERVER_ERROR,
|
|
return createBadResponse(Status.INTERNAL_SERVER_ERROR,
|
|
"Can not get ContainerInfo for the container: " + containerId);
|
|
"Can not get ContainerInfo for the container: " + containerId);
|
|
@@ -267,7 +271,7 @@ public class AHSWebServices extends WebServices {
|
|
return response.build();
|
|
return response.build();
|
|
} else if (isFinishedState(appInfo.getAppState())) {
|
|
} else if (isFinishedState(appInfo.getAppState())) {
|
|
return sendStreamOutputResponse(appId, appOwner, nodeId,
|
|
return sendStreamOutputResponse(appId, appOwner, nodeId,
|
|
- containerIdStr, filename, downloadFile);
|
|
|
|
|
|
+ containerIdStr, filename, downloadFile, length);
|
|
} else {
|
|
} else {
|
|
return createBadResponse(Status.NOT_FOUND,
|
|
return createBadResponse(Status.NOT_FOUND,
|
|
"The application is not at Running or Finished State.");
|
|
"The application is not at Running or Finished State.");
|
|
@@ -296,11 +300,11 @@ public class AHSWebServices extends WebServices {
|
|
|
|
|
|
private Response sendStreamOutputResponse(ApplicationId appId,
|
|
private Response sendStreamOutputResponse(ApplicationId appId,
|
|
String appOwner, String nodeId, String containerIdStr,
|
|
String appOwner, String nodeId, String containerIdStr,
|
|
- String fileName, boolean downloadFile) {
|
|
|
|
|
|
+ String fileName, boolean downloadFile, long bytes) {
|
|
StreamingOutput stream = null;
|
|
StreamingOutput stream = null;
|
|
try {
|
|
try {
|
|
stream = getStreamingOutput(appId, appOwner, nodeId,
|
|
stream = getStreamingOutput(appId, appOwner, nodeId,
|
|
- containerIdStr, fileName);
|
|
|
|
|
|
+ containerIdStr, fileName, bytes);
|
|
} catch (Exception ex) {
|
|
} catch (Exception ex) {
|
|
return createBadResponse(Status.INTERNAL_SERVER_ERROR,
|
|
return createBadResponse(Status.INTERNAL_SERVER_ERROR,
|
|
ex.getMessage());
|
|
ex.getMessage());
|
|
@@ -318,7 +322,7 @@ public class AHSWebServices extends WebServices {
|
|
|
|
|
|
private StreamingOutput getStreamingOutput(ApplicationId appId,
|
|
private StreamingOutput getStreamingOutput(ApplicationId appId,
|
|
String appOwner, final String nodeId, final String containerIdStr,
|
|
String appOwner, final String nodeId, final String containerIdStr,
|
|
- final String logFile) throws IOException{
|
|
|
|
|
|
+ final String logFile, final long bytes) throws IOException{
|
|
String suffix = LogAggregationUtils.getRemoteNodeLogDirSuffix(conf);
|
|
String suffix = LogAggregationUtils.getRemoteNodeLogDirSuffix(conf);
|
|
org.apache.hadoop.fs.Path remoteRootLogDir = new org.apache.hadoop.fs.Path(
|
|
org.apache.hadoop.fs.Path remoteRootLogDir = new org.apache.hadoop.fs.Path(
|
|
conf.get(YarnConfiguration.NM_REMOTE_APP_LOG_DIR,
|
|
conf.get(YarnConfiguration.NM_REMOTE_APP_LOG_DIR,
|
|
@@ -391,16 +395,35 @@ public class AHSWebServices extends WebServices {
|
|
byte[] b = sb.toString().getBytes(Charset.forName("UTF-8"));
|
|
byte[] b = sb.toString().getBytes(Charset.forName("UTF-8"));
|
|
os.write(b, 0, b.length);
|
|
os.write(b, 0, b.length);
|
|
|
|
|
|
|
|
+ long toSkip = 0;
|
|
|
|
+ long totalBytesToRead = fileLength;
|
|
|
|
+ if (bytes < 0) {
|
|
|
|
+ long absBytes = Math.abs(bytes);
|
|
|
|
+ if (absBytes < fileLength) {
|
|
|
|
+ toSkip = fileLength - absBytes;
|
|
|
|
+ totalBytesToRead = absBytes;
|
|
|
|
+ }
|
|
|
|
+ long skippedBytes = valueStream.skip(toSkip);
|
|
|
|
+ if (skippedBytes != toSkip) {
|
|
|
|
+ throw new IOException("The bytes were skipped are "
|
|
|
|
+ + "different from the caller requested");
|
|
|
|
+ }
|
|
|
|
+ } else {
|
|
|
|
+ if (bytes < fileLength) {
|
|
|
|
+ totalBytesToRead = bytes;
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+
|
|
long curRead = 0;
|
|
long curRead = 0;
|
|
- long pendingRead = fileLength - curRead;
|
|
|
|
|
|
+ long pendingRead = totalBytesToRead - curRead;
|
|
int toRead = pendingRead > buf.length ? buf.length
|
|
int toRead = pendingRead > buf.length ? buf.length
|
|
: (int) pendingRead;
|
|
: (int) pendingRead;
|
|
int len = valueStream.read(buf, 0, toRead);
|
|
int len = valueStream.read(buf, 0, toRead);
|
|
- while (len != -1 && curRead < fileLength) {
|
|
|
|
|
|
+ while (len != -1 && curRead < totalBytesToRead) {
|
|
os.write(buf, 0, len);
|
|
os.write(buf, 0, len);
|
|
curRead += len;
|
|
curRead += len;
|
|
|
|
|
|
- pendingRead = fileLength - curRead;
|
|
|
|
|
|
+ pendingRead = totalBytesToRead - curRead;
|
|
toRead = pendingRead > buf.length ? buf.length
|
|
toRead = pendingRead > buf.length ? buf.length
|
|
: (int) pendingRead;
|
|
: (int) pendingRead;
|
|
len = valueStream.read(buf, 0, toRead);
|
|
len = valueStream.read(buf, 0, toRead);
|
|
@@ -433,4 +456,11 @@ public class AHSWebServices extends WebServices {
|
|
};
|
|
};
|
|
return stream;
|
|
return stream;
|
|
}
|
|
}
|
|
|
|
+
|
|
|
|
+ private long parseLongParam(String bytes) {
|
|
|
|
+ if (bytes == null || bytes.isEmpty()) {
|
|
|
|
+ return Long.MAX_VALUE;
|
|
|
|
+ }
|
|
|
|
+ return Long.parseLong(bytes);
|
|
|
|
+ }
|
|
}
|
|
}
|