Browse Source

HADOOP-1651. Improve progress reporting. Contributed by Devaraj Das.

git-svn-id: https://svn.apache.org/repos/asf/lucene/hadoop/trunk@563649 13f79535-47bb-0310-9956-ffa450edef68
Thomas White 18 years ago
parent
commit
790e124080

+ 5 - 0
CHANGES.txt

@@ -32,6 +32,11 @@ Trunk (unreleased changes)
     HADOOP-1463.  HDFS report correct usage statistics for disk space
     used by HDFS.  (Hairong Kuang via dhruba)
 
+  IMPROVEMENTS
+
+    HADOOP-1651.  Improve progress reporting.
+    (Devaraj Das via tomwhite)
+
 Branch 0.14 (unreleased changes)
 
   1. HADOOP-1197.  In Configuration, deprecate getObject() and add

+ 18 - 10
src/java/org/apache/hadoop/mapred/Task.java

@@ -217,7 +217,7 @@ abstract class Task implements Writable, Configurable {
                                           ) throws IOException;
 
   /** The number of milliseconds between progress reports. */
-  public static final int PROGRESS_INTERVAL = 1000;
+  public static final int PROGRESS_INTERVAL = 3000;
 
   private transient Progress taskProgress = new Progress();
 
@@ -230,6 +230,8 @@ abstract class Task implements Writable, Configurable {
    * Using AtomicBoolean since we need an atomic read & reset method. 
    */  
   private AtomicBoolean progressFlag = new AtomicBoolean(false);
+  /* flag to track whether task is done */
+  private AtomicBoolean taskDone = new AtomicBoolean(false);
   // getters and setters for flag
   private void setProgressFlag() {
     progressFlag.set(true);
@@ -256,11 +258,20 @@ abstract class Task implements Writable, Configurable {
         public void run() {
           final int MAX_RETRIES = 3;
           int remainingRetries = MAX_RETRIES;
-          while (true) {
+          // get current flag value and reset it as well
+          boolean sendProgress = resetProgressFlag();
+          while (!taskDone.get()) {
             try {
-              // get current flag value and reset it as well
-              boolean sendProgress = resetProgressFlag();
               boolean taskFound = true; // whether TT knows about this task
+              // sleep for a bit
+              try {
+                Thread.sleep(PROGRESS_INTERVAL);
+              } 
+              catch (InterruptedException e) {
+                LOG.debug(getTaskId() + " Progress/ping thread exiting " +
+                                        "since it got interrupted");
+                break;
+              }
               
               if (sendProgress) {
                 // we need to send progress update
@@ -279,13 +290,8 @@ abstract class Task implements Writable, Configurable {
                 System.exit(66);
               }
               
+              sendProgress = resetProgressFlag(); 
               remainingRetries = MAX_RETRIES;
-              // sleep for a bit
-              try {
-                Thread.sleep(PROGRESS_INTERVAL);
-              } 
-              catch (InterruptedException e) {
-              }
             } 
             catch (Throwable t) {
               LOG.info("Communication exception: " + StringUtils.stringifyException(t));
@@ -301,6 +307,7 @@ abstract class Task implements Writable, Configurable {
       }, "Comm thread for "+taskId);
     thread.setDaemon(true);
     thread.start();
+    LOG.debug(getTaskId() + " Progress/ping thread started");
   }
 
   
@@ -338,6 +345,7 @@ abstract class Task implements Writable, Configurable {
   public void done(TaskUmbilicalProtocol umbilical) throws IOException {
     int retries = 10;
     boolean needProgress = true;
+    taskDone.set(true);
     while (true) {
       try {
         if (needProgress) {

+ 2 - 0
src/java/org/apache/hadoop/mapred/TaskTracker.java

@@ -1776,6 +1776,8 @@ public class TaskTracker
       JobConf defaultConf = new JobConf();
       int port = Integer.parseInt(args[0]);
       String taskid = args[1];
+      //set a very high idle timeout so that the connection is never closed
+      defaultConf.setInt("ipc.client.connection.maxidletime", 60*60*1000);
       TaskUmbilicalProtocol umbilical =
         (TaskUmbilicalProtocol)RPC.getProxy(TaskUmbilicalProtocol.class,
                                             TaskUmbilicalProtocol.versionID,