Browse Source

HADOOP-5577. Add a verbose flag to mapreduce.Job.waitForCompletion to get
the running job's information printed to the user's stdout as it runs.
(omalley)


git-svn-id: https://svn.apache.org/repos/asf/hadoop/core/branches/branch-0.20@761049 13f79535-47bb-0310-9956-ffa450edef68

Owen O'Malley 16 years ago
parent
commit
93ae732c1b

+ 4 - 0
CHANGES.txt

@@ -828,6 +828,10 @@ Release 0.20.0 - Unreleased
     updated before the JVM exits. Also makes the update to log.index atomic.
     updated before the JVM exits. Also makes the update to log.index atomic.
     (Ravi Gummadi via ddas)
     (Ravi Gummadi via ddas)
 
 
+    HADOOP-5577. Add a verbose flag to mapreduce.Job.waitForCompletion to get
+    the running job's information printed to the user's stdout as it runs.
+    (omalley)
+
 Release 0.19.2 - Unreleased
 Release 0.19.2 - Unreleased
 
 
   BUG FIXES
   BUG FIXES

+ 1 - 1
src/examples/org/apache/hadoop/examples/SecondarySort.java

@@ -233,7 +233,7 @@ public class SecondarySort {
     
     
     FileInputFormat.addInputPath(job, new Path(otherArgs[0]));
     FileInputFormat.addInputPath(job, new Path(otherArgs[0]));
     FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));
     FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));
-    System.exit(job.waitForCompletion() ? 0 : 1);
+    System.exit(job.waitForCompletion(true) ? 0 : 1);
   }
   }
 
 
 }
 }

+ 1 - 1
src/examples/org/apache/hadoop/examples/WordCount.java

@@ -64,6 +64,6 @@ public class WordCount {
     job.setOutputValueClass(IntWritable.class);
     job.setOutputValueClass(IntWritable.class);
     FileInputFormat.addInputPath(job, new Path(otherArgs[0]));
     FileInputFormat.addInputPath(job, new Path(otherArgs[0]));
     FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));
     FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));
-    System.exit(job.waitForCompletion() ? 0 : 1);
+    System.exit(job.waitForCompletion(true) ? 0 : 1);
   }
   }
 }
 }

+ 88 - 107
src/mapred/org/apache/hadoop/mapred/JobClient.java

@@ -1245,126 +1245,107 @@ public class JobClient extends Configured implements MRConstants, Tool  {
    * complete.
    * complete.
    * 
    * 
    * @param job the job configuration.
    * @param job the job configuration.
-   * @throws IOException
+   * @throws IOException if the job fails
    */
    */
   public static RunningJob runJob(JobConf job) throws IOException {
   public static RunningJob runJob(JobConf job) throws IOException {
     JobClient jc = new JobClient(job);
     JobClient jc = new JobClient(job);
-    boolean error = true;
-    RunningJob running = null;
-    String lastReport = null;
-    final int MAX_RETRIES = 5;
-    int retries = MAX_RETRIES;
-    TaskStatusFilter filter;
+    RunningJob rj = jc.submitJob(job);
     try {
     try {
-      filter = getTaskOutputFilter(job);
-    } catch(IllegalArgumentException e) {
-      LOG.warn("Invalid Output filter : " + e.getMessage() + 
-               " Valid values are : NONE, FAILED, SUCCEEDED, ALL");
-      throw e;
+      if (!jc.monitorAndPrintJob(job, rj)) {
+        throw new IOException("Job failed!");
+      }
+    } catch (InterruptedException ie) {
+      Thread.currentThread().interrupt();
     }
     }
-    try {
-      running = jc.submitJob(job);
-      JobID jobId = running.getID();
-      LOG.info("Running job: " + jobId);
-      int eventCounter = 0;
-      boolean profiling = job.getProfileEnabled();
-      Configuration.IntegerRanges mapRanges = job.getProfileTaskRange(true);
-      Configuration.IntegerRanges reduceRanges = job.getProfileTaskRange(false);
-        
-      while (true) {
-        try {
-          Thread.sleep(1000);
-        } catch (InterruptedException e) {}
-        try {
-          if (running.isComplete()) {
-            break;
-          }
-          running = jc.getJob(jobId);
-          if (running == null) {
-            throw new IOException("Unable to fetch job status from server.");
-          }
-          String report = 
-            (" map " + StringUtils.formatPercent(running.mapProgress(), 0)+
-             " reduce " + 
-             StringUtils.formatPercent(running.reduceProgress(), 0));
-          if (!report.equals(lastReport)) {
-            LOG.info(report);
-            lastReport = report;
-          }
-            
-          TaskCompletionEvent[] events = 
-            running.getTaskCompletionEvents(eventCounter); 
-          eventCounter += events.length;
-          for(TaskCompletionEvent event : events){
-            TaskCompletionEvent.Status status = event.getTaskStatus();
-            if (profiling && 
-                (status == TaskCompletionEvent.Status.SUCCEEDED ||
-                 status == TaskCompletionEvent.Status.FAILED) &&
+    return rj;
+  }
+  
+  /**
+   * Monitor a job and print status in real-time as progress is made and tasks 
+   * fail.
+   * @param conf the job's configuration
+   * @param job the job to track
+   * @return true if the job succeeded
+   * @throws IOException if communication to the JobTracker fails
+   */
+  public boolean monitorAndPrintJob(JobConf conf, 
+                                    RunningJob job
+  ) throws IOException, InterruptedException {
+    String lastReport = null;
+    TaskStatusFilter filter;
+    filter = getTaskOutputFilter(conf);
+    JobID jobId = job.getID();
+    LOG.info("Running job: " + jobId);
+    int eventCounter = 0;
+    boolean profiling = conf.getProfileEnabled();
+    Configuration.IntegerRanges mapRanges = conf.getProfileTaskRange(true);
+    Configuration.IntegerRanges reduceRanges = conf.getProfileTaskRange(false);
+
+    while (!job.isComplete()) {
+      Thread.sleep(1000);
+      String report = 
+        (" map " + StringUtils.formatPercent(job.mapProgress(), 0)+
+            " reduce " + 
+            StringUtils.formatPercent(job.reduceProgress(), 0));
+      if (!report.equals(lastReport)) {
+        LOG.info(report);
+        lastReport = report;
+      }
+
+      TaskCompletionEvent[] events = 
+        job.getTaskCompletionEvents(eventCounter); 
+      eventCounter += events.length;
+      for(TaskCompletionEvent event : events){
+        TaskCompletionEvent.Status status = event.getTaskStatus();
+        if (profiling && 
+            (status == TaskCompletionEvent.Status.SUCCEEDED ||
+                status == TaskCompletionEvent.Status.FAILED) &&
                 (event.isMap ? mapRanges : reduceRanges).
                 (event.isMap ? mapRanges : reduceRanges).
-                   isIncluded(event.idWithinJob())) {
-              downloadProfile(event);
-            }
-            switch(filter){
-            case NONE:
-              break;
-            case SUCCEEDED:
-              if (event.getTaskStatus() == 
-                TaskCompletionEvent.Status.SUCCEEDED){
-                LOG.info(event.toString());
-                displayTaskLogs(event.getTaskAttemptId(), event.getTaskTrackerHttp());
-              }
-              break; 
-            case FAILED:
-              if (event.getTaskStatus() == 
-                TaskCompletionEvent.Status.FAILED){
-                LOG.info(event.toString());
-                // Displaying the task diagnostic information
-                TaskAttemptID taskId = event.getTaskAttemptId();
-                String[] taskDiagnostics = 
-                  jc.jobSubmitClient.getTaskDiagnostics(taskId); 
-                if (taskDiagnostics != null) {
-                  for(String diagnostics : taskDiagnostics){
-                    System.err.println(diagnostics);
-                  }
-                }
-                // Displaying the task logs
-                displayTaskLogs(event.getTaskAttemptId(), event.getTaskTrackerHttp());
-              }
-              break; 
-            case KILLED:
-              if (event.getTaskStatus() == TaskCompletionEvent.Status.KILLED){
-                LOG.info(event.toString());
+                isIncluded(event.idWithinJob())) {
+          downloadProfile(event);
+        }
+        switch(filter){
+        case NONE:
+          break;
+        case SUCCEEDED:
+          if (event.getTaskStatus() == 
+            TaskCompletionEvent.Status.SUCCEEDED){
+            LOG.info(event.toString());
+            displayTaskLogs(event.getTaskAttemptId(), event.getTaskTrackerHttp());
+          }
+          break; 
+        case FAILED:
+          if (event.getTaskStatus() == 
+            TaskCompletionEvent.Status.FAILED){
+            LOG.info(event.toString());
+            // Displaying the task diagnostic information
+            TaskAttemptID taskId = event.getTaskAttemptId();
+            String[] taskDiagnostics = 
+              jobSubmitClient.getTaskDiagnostics(taskId); 
+            if (taskDiagnostics != null) {
+              for(String diagnostics : taskDiagnostics){
+                System.err.println(diagnostics);
               }
               }
-              break; 
-            case ALL:
-              LOG.info(event.toString());
-              displayTaskLogs(event.getTaskAttemptId(), event.getTaskTrackerHttp());
-              break;
             }
             }
+            // Displaying the task logs
+            displayTaskLogs(event.getTaskAttemptId(), event.getTaskTrackerHttp());
           }
           }
-          retries = MAX_RETRIES;
-        } catch (IOException ie) {
-          if (--retries == 0) {
-            LOG.warn("Final attempt failed, killing job.");
-            throw ie;
+          break; 
+        case KILLED:
+          if (event.getTaskStatus() == TaskCompletionEvent.Status.KILLED){
+            LOG.info(event.toString());
           }
           }
-          LOG.info("Communication problem with server: " +
-                   StringUtils.stringifyException(ie));
+          break; 
+        case ALL:
+          LOG.info(event.toString());
+          displayTaskLogs(event.getTaskAttemptId(), event.getTaskTrackerHttp());
+          break;
         }
         }
       }
       }
-      if (!running.isSuccessful()) {
-        throw new IOException("Job failed!");
-      }
-      LOG.info("Job complete: " + jobId);
-      running.getCounters().log(LOG);
-      error = false;
-    } finally {
-      if (error && (running != null)) {
-        running.killJob();
-      }
-      jc.close();
     }
     }
-    return running;
+    LOG.info("Job complete: " + jobId);
+    job.getCounters().log(LOG);
+    return job.isSuccessful();
   }
   }
 
 
   static String getTaskLogURL(TaskAttemptID taskId, String baseUrl) {
   static String getTaskLogURL(TaskAttemptID taskId, String baseUrl) {

+ 12 - 5
src/mapred/org/apache/hadoop/mapreduce/Job.java

@@ -38,7 +38,7 @@ import org.apache.hadoop.mapred.TaskCompletionEvent;
 public class Job extends JobContext {  
 public class Job extends JobContext {  
   public static enum JobState {DEFINE, RUNNING};
   public static enum JobState {DEFINE, RUNNING};
   private JobState state = JobState.DEFINE;
   private JobState state = JobState.DEFINE;
-  private JobClient jobTracker;
+  private JobClient jobClient;
   private RunningJob info;
   private RunningJob info;
 
 
   public Job() throws IOException {
   public Job() throws IOException {
@@ -47,7 +47,7 @@ public class Job extends JobContext {
 
 
   public Job(Configuration conf) throws IOException {
   public Job(Configuration conf) throws IOException {
     super(conf, null);
     super(conf, null);
-    jobTracker = new JobClient((JobConf) getConfiguration());
+    jobClient = new JobClient((JobConf) getConfiguration());
   }
   }
 
 
   public Job(Configuration conf, String jobName) throws IOException {
   public Job(Configuration conf, String jobName) throws IOException {
@@ -429,22 +429,29 @@ public class Job extends JobContext {
                               ClassNotFoundException {
                               ClassNotFoundException {
     ensureState(JobState.DEFINE);
     ensureState(JobState.DEFINE);
     setUseNewAPI();
     setUseNewAPI();
-    info = jobTracker.submitJobInternal(conf);
+    info = jobClient.submitJobInternal(conf);
     state = JobState.RUNNING;
     state = JobState.RUNNING;
    }
    }
   
   
   /**
   /**
    * Submit the job to the cluster and wait for it to finish.
    * Submit the job to the cluster and wait for it to finish.
+   * @param verbose print the progress to the user
    * @return true if the job succeeded
    * @return true if the job succeeded
    * @throws IOException thrown if the communication with the 
    * @throws IOException thrown if the communication with the 
    *         <code>JobTracker</code> is lost
    *         <code>JobTracker</code> is lost
    */
    */
-  public boolean waitForCompletion() throws IOException, InterruptedException,
+  public boolean waitForCompletion(boolean verbose
+                                   ) throws IOException, InterruptedException,
                                             ClassNotFoundException {
                                             ClassNotFoundException {
     if (state == JobState.DEFINE) {
     if (state == JobState.DEFINE) {
       submit();
       submit();
     }
     }
-    info.waitForCompletion();
+    if (verbose) {
+      jobClient.monitorAndPrintJob(conf, info);
+    } else {
+      info.waitForCompletion();
+    }
     return isSuccessful();
     return isSuccessful();
   }
   }
+  
 }
 }

+ 2 - 2
src/test/org/apache/hadoop/mapreduce/TestMapReduceLocal.java

@@ -111,7 +111,7 @@ public class TestMapReduceLocal extends TestCase {
     job.setOutputValueClass(IntWritable.class);
     job.setOutputValueClass(IntWritable.class);
     FileInputFormat.addInputPath(job, new Path(TEST_ROOT_DIR + "/in"));
     FileInputFormat.addInputPath(job, new Path(TEST_ROOT_DIR + "/in"));
     FileOutputFormat.setOutputPath(job, new Path(TEST_ROOT_DIR + "/out"));
     FileOutputFormat.setOutputPath(job, new Path(TEST_ROOT_DIR + "/out"));
-    assertTrue(job.waitForCompletion());
+    assertTrue(job.waitForCompletion(false));
     String out = readFile("out/part-r-00000");
     String out = readFile("out/part-r-00000");
     System.out.println(out);
     System.out.println(out);
     assertEquals("a\t1\ncount\t1\nis\t1\nmore\t1\nof\t1\ntest\t4\nthis\t1\nword\t1\n",
     assertEquals("a\t1\ncount\t1\nis\t1\nmore\t1\nof\t1\ntest\t4\nthis\t1\nword\t1\n",
@@ -156,7 +156,7 @@ public class TestMapReduceLocal extends TestCase {
     
     
     FileInputFormat.addInputPath(job, new Path(TEST_ROOT_DIR + "/in"));
     FileInputFormat.addInputPath(job, new Path(TEST_ROOT_DIR + "/in"));
     FileOutputFormat.setOutputPath(job, new Path(TEST_ROOT_DIR + "/out"));
     FileOutputFormat.setOutputPath(job, new Path(TEST_ROOT_DIR + "/out"));
-    assertTrue(job.waitForCompletion());
+    assertTrue(job.waitForCompletion(true));
     String out = readFile("out/part-r-00000");
     String out = readFile("out/part-r-00000");
     assertEquals("------------------------------------------------\n" +
     assertEquals("------------------------------------------------\n" +
                  "-3\t23\n" +
                  "-3\t23\n" +