|
@@ -384,7 +384,7 @@ public class JobClient extends ToolBase implements MRConstants {
|
|
String lastReport = null;
|
|
String lastReport = null;
|
|
final int MAX_RETRIES = 5;
|
|
final int MAX_RETRIES = 5;
|
|
int retries = MAX_RETRIES;
|
|
int retries = MAX_RETRIES;
|
|
- String outputFilterName = job.get("jobclient.output.filter");
|
|
|
|
|
|
+ String outputFilterName = job.get("jobclient.output.filter", "FAILED");
|
|
|
|
|
|
if (null != outputFilterName) {
|
|
if (null != outputFilterName) {
|
|
try {
|
|
try {
|
|
@@ -428,19 +428,19 @@ public class JobClient extends ToolBase implements MRConstants {
|
|
if( event.getTaskStatus() ==
|
|
if( event.getTaskStatus() ==
|
|
TaskCompletionEvent.Status.SUCCEEDED){
|
|
TaskCompletionEvent.Status.SUCCEEDED){
|
|
LOG.info(event.toString());
|
|
LOG.info(event.toString());
|
|
- printHttpFile(event.getTaskTrackerHttp());
|
|
|
|
|
|
+ displayTaskLogs(event.getTaskId(), event.getTaskTrackerHttp());
|
|
}
|
|
}
|
|
break;
|
|
break;
|
|
case FAILED:
|
|
case FAILED:
|
|
if( event.getTaskStatus() ==
|
|
if( event.getTaskStatus() ==
|
|
TaskCompletionEvent.Status.FAILED){
|
|
TaskCompletionEvent.Status.FAILED){
|
|
LOG.info(event.toString());
|
|
LOG.info(event.toString());
|
|
- printHttpFile(event.getTaskTrackerHttp());
|
|
|
|
|
|
+ displayTaskLogs(event.getTaskId(), event.getTaskTrackerHttp());
|
|
}
|
|
}
|
|
break ;
|
|
break ;
|
|
case ALL:
|
|
case ALL:
|
|
LOG.info(event.toString());
|
|
LOG.info(event.toString());
|
|
- printHttpFile(event.getTaskTrackerHttp());
|
|
|
|
|
|
+ displayTaskLogs(event.getTaskId(), event.getTaskTrackerHttp());
|
|
break;
|
|
break;
|
|
}
|
|
}
|
|
}
|
|
}
|
|
@@ -467,29 +467,33 @@ public class JobClient extends ToolBase implements MRConstants {
|
|
jc.close();
|
|
jc.close();
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
+
|
|
|
|
+ private static void displayTaskLogs(String taskId, String baseUrl)
|
|
|
|
+ throws IOException {
|
|
|
|
+ // Copy tasks's stdout of the JobClient
|
|
|
|
+ getTaskLogs(taskId, new URL(baseUrl+"&filter=stdout"), System.out);
|
|
|
|
+
|
|
|
|
+ // Copy task's stderr to stderr of the JobClient
|
|
|
|
+ getTaskLogs(taskId, new URL(baseUrl+"&filter=stderr"), System.err);
|
|
|
|
+ }
|
|
|
|
|
|
- static void printHttpFile(String httpURL ) throws IOException {
|
|
|
|
- boolean good = false;
|
|
|
|
- long totalBytes = 0;
|
|
|
|
- URL path = new URL(httpURL);
|
|
|
|
|
|
+ private static void getTaskLogs(String taskId, URL taskLogUrl,
|
|
|
|
+ OutputStream out) {
|
|
try {
|
|
try {
|
|
- URLConnection connection = path.openConnection();
|
|
|
|
- InputStream input = connection.getInputStream();
|
|
|
|
|
|
+ URLConnection connection = taskLogUrl.openConnection();
|
|
|
|
+ BufferedReader input =
|
|
|
|
+ new BufferedReader(new InputStreamReader(connection.getInputStream()));
|
|
|
|
+ BufferedWriter output =
|
|
|
|
+ new BufferedWriter(new OutputStreamWriter(out));
|
|
try {
|
|
try {
|
|
- byte[] buffer = new byte[64 * 1024];
|
|
|
|
- int len = input.read(buffer);
|
|
|
|
- while (len > 0) {
|
|
|
|
- totalBytes += len;
|
|
|
|
- LOG.info(new String(buffer).trim());
|
|
|
|
- len = input.read(buffer);
|
|
|
|
- }
|
|
|
|
- good = ((int) totalBytes) == connection.getContentLength();
|
|
|
|
- if (!good) {
|
|
|
|
- LOG.warn("Incomplete task output received for " + path +
|
|
|
|
- " (" + totalBytes + " instead of " +
|
|
|
|
- connection.getContentLength() + ")");
|
|
|
|
|
|
+ String logData = null;
|
|
|
|
+ while ((logData = input.readLine()) != null) {
|
|
|
|
+ if (logData.length() > 0) {
|
|
|
|
+ output.write(taskId + ": " + logData + "\n");
|
|
|
|
+ output.flush();
|
|
|
|
+ }
|
|
}
|
|
}
|
|
- }finally {
|
|
|
|
|
|
+ } finally {
|
|
input.close();
|
|
input.close();
|
|
}
|
|
}
|
|
}catch(IOException ioe){
|
|
}catch(IOException ioe){
|