|
@@ -17,7 +17,6 @@
|
|
*/
|
|
*/
|
|
package org.apache.hadoop.mapred;
|
|
package org.apache.hadoop.mapred;
|
|
|
|
|
|
-import org.apache.commons.cli2.validation.InvalidArgumentException;
|
|
|
|
import org.apache.commons.logging.*;
|
|
import org.apache.commons.logging.*;
|
|
|
|
|
|
import org.apache.hadoop.fs.*;
|
|
import org.apache.hadoop.fs.*;
|
|
@@ -39,6 +38,8 @@ import java.util.*;
|
|
*******************************************************/
|
|
*******************************************************/
|
|
public class JobClient extends ToolBase implements MRConstants {
|
|
public class JobClient extends ToolBase implements MRConstants {
|
|
private static final Log LOG = LogFactory.getLog("org.apache.hadoop.mapred.JobClient");
|
|
private static final Log LOG = LogFactory.getLog("org.apache.hadoop.mapred.JobClient");
|
|
|
|
+ public static enum TaskStatusFilter { NONE, FAILED, SUCCEEDED, ALL };
|
|
|
|
+ private TaskStatusFilter taskOutputFilter = TaskStatusFilter.FAILED;
|
|
|
|
|
|
static long MAX_JOBPROFILE_AGE = 1000 * 2;
|
|
static long MAX_JOBPROFILE_AGE = 1000 * 2;
|
|
|
|
|
|
@@ -150,6 +151,14 @@ public class JobClient extends ToolBase implements MRConstants {
|
|
public synchronized void killJob() throws IOException {
|
|
public synchronized void killJob() throws IOException {
|
|
jobSubmitClient.killJob(getJobID());
|
|
jobSubmitClient.killJob(getJobID());
|
|
}
|
|
}
|
|
|
|
+ /**
|
|
|
|
+ * Fetch task completion events from jobtracker for this job.
|
|
|
|
+ */
|
|
|
|
+ public synchronized TaskCompletionEvent[] getTaskCompletionEvents(
|
|
|
|
+ int startFrom) throws IOException{
|
|
|
|
+ return jobSubmitClient.getTaskCompletionEvents(
|
|
|
|
+ getJobID(), startFrom);
|
|
|
|
+ }
|
|
|
|
|
|
/**
|
|
/**
|
|
* Dump stats to screen
|
|
* Dump stats to screen
|
|
@@ -367,10 +376,22 @@ public class JobClient extends ToolBase implements MRConstants {
|
|
String lastReport = null;
|
|
String lastReport = null;
|
|
final int MAX_RETRIES = 5;
|
|
final int MAX_RETRIES = 5;
|
|
int retries = MAX_RETRIES;
|
|
int retries = MAX_RETRIES;
|
|
|
|
+ String outputFilterName = job.get("jobclient.output.filter");
|
|
|
|
+
|
|
|
|
+ if (null != outputFilterName) {
|
|
|
|
+ try {
|
|
|
|
+ jc.setTaskOutputFilter(TaskStatusFilter.valueOf(outputFilterName));
|
|
|
|
+ } catch(IllegalArgumentException e) {
|
|
|
|
+ LOG.warn("Invalid Output filter : " + outputFilterName +
|
|
|
|
+ " Valid values are : NONE, FAILED, SUCCEEDED, ALL");
|
|
|
|
+ }
|
|
|
|
+ }
|
|
try {
|
|
try {
|
|
running = jc.submitJob(job);
|
|
running = jc.submitJob(job);
|
|
String jobId = running.getJobID();
|
|
String jobId = running.getJobID();
|
|
LOG.info("Running job: " + jobId);
|
|
LOG.info("Running job: " + jobId);
|
|
|
|
+ int eventCounter = 0 ;
|
|
|
|
+
|
|
while (true) {
|
|
while (true) {
|
|
try {
|
|
try {
|
|
Thread.sleep(1000);
|
|
Thread.sleep(1000);
|
|
@@ -388,6 +409,34 @@ public class JobClient extends ToolBase implements MRConstants {
|
|
LOG.info(report);
|
|
LOG.info(report);
|
|
lastReport = report;
|
|
lastReport = report;
|
|
}
|
|
}
|
|
|
|
+
|
|
|
|
+ if( jc.getTaskOutputFilter() != TaskStatusFilter.NONE){
|
|
|
|
+ TaskCompletionEvent[] events =
|
|
|
|
+ running.getTaskCompletionEvents(eventCounter);
|
|
|
|
+ eventCounter += events.length ;
|
|
|
|
+ for(TaskCompletionEvent event : events ){
|
|
|
|
+ switch( jc.getTaskOutputFilter() ){
|
|
|
|
+ case SUCCEEDED:
|
|
|
|
+ if( event.getTaskStatus() ==
|
|
|
|
+ TaskCompletionEvent.Status.SUCCEEDED){
|
|
|
|
+ LOG.info(event.toString());
|
|
|
|
+ printHttpFile(event.getTaskTrackerHttp());
|
|
|
|
+ }
|
|
|
|
+ break;
|
|
|
|
+ case FAILED:
|
|
|
|
+ if( event.getTaskStatus() ==
|
|
|
|
+ TaskCompletionEvent.Status.FAILED){
|
|
|
|
+ LOG.info(event.toString());
|
|
|
|
+ printHttpFile(event.getTaskTrackerHttp());
|
|
|
|
+ }
|
|
|
|
+ break ;
|
|
|
|
+ case ALL:
|
|
|
|
+ LOG.info(event.toString());
|
|
|
|
+ printHttpFile(event.getTaskTrackerHttp());
|
|
|
|
+ break;
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+ }
|
|
retries = MAX_RETRIES;
|
|
retries = MAX_RETRIES;
|
|
} catch (IOException ie) {
|
|
} catch (IOException ie) {
|
|
if (--retries == 0) {
|
|
if (--retries == 0) {
|
|
@@ -410,6 +459,35 @@ public class JobClient extends ToolBase implements MRConstants {
|
|
jc.close();
|
|
jc.close();
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
+
|
|
|
|
+ static void printHttpFile(String httpURL ) throws IOException {
|
|
|
|
+ boolean good = false;
|
|
|
|
+ long totalBytes = 0;
|
|
|
|
+ URL path = new URL(httpURL);
|
|
|
|
+ try {
|
|
|
|
+ URLConnection connection = path.openConnection();
|
|
|
|
+ InputStream input = connection.getInputStream();
|
|
|
|
+ try {
|
|
|
|
+ byte[] buffer = new byte[64 * 1024];
|
|
|
|
+ int len = input.read(buffer);
|
|
|
|
+ while (len > 0) {
|
|
|
|
+ totalBytes += len;
|
|
|
|
+ LOG.info(new String(buffer).trim());
|
|
|
|
+ len = input.read(buffer);
|
|
|
|
+ }
|
|
|
|
+ good = ((int) totalBytes) == connection.getContentLength();
|
|
|
|
+ if (!good) {
|
|
|
|
+ LOG.warn("Incomplete task output received for " + path +
|
|
|
|
+ " (" + totalBytes + " instead of " +
|
|
|
|
+ connection.getContentLength() + ")");
|
|
|
|
+ }
|
|
|
|
+ }finally {
|
|
|
|
+ input.close();
|
|
|
|
+ }
|
|
|
|
+ }catch(IOException ioe){
|
|
|
|
+ LOG.warn("Error reading task output" + ioe.getMessage());
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
|
|
static Configuration getConfiguration(String jobTrackerSpec)
|
|
static Configuration getConfiguration(String jobTrackerSpec)
|
|
{
|
|
{
|
|
@@ -428,8 +506,23 @@ public class JobClient extends ToolBase implements MRConstants {
|
|
}
|
|
}
|
|
return conf;
|
|
return conf;
|
|
}
|
|
}
|
|
-
|
|
|
|
|
|
|
|
|
|
+ /**
|
|
|
|
+ * Sets the output filter for tasks. only those tasks are printed whose
|
|
|
|
+ * output matches the filter.
|
|
|
|
+ * @param newValue task filter.
|
|
|
|
+ */
|
|
|
|
+ public void setTaskOutputFilter(TaskStatusFilter newValue){
|
|
|
|
+ this.taskOutputFilter = newValue ;
|
|
|
|
+ }
|
|
|
|
+ /**
|
|
|
|
+ * Returns task output filter.
|
|
|
|
+ * @return task filter.
|
|
|
|
+ */
|
|
|
|
+ public TaskStatusFilter getTaskOutputFilter(){
|
|
|
|
+ return this.taskOutputFilter;
|
|
|
|
+ }
|
|
|
|
+
|
|
public int run(String[] argv) throws Exception {
|
|
public int run(String[] argv) throws Exception {
|
|
if (argv.length < 2) {
|
|
if (argv.length < 2) {
|
|
System.out.println("JobClient -submit <job> | -status <id> | -kill <id> [-jt <jobtracker:port>|<config>]");
|
|
System.out.println("JobClient -submit <job> | -status <id> | -kill <id> [-jt <jobtracker:port>|<config>]");
|