Просмотр исходного кода

Merge -c 1241282 from trunk to branch-0.23 to fix MAPREDUCE-3809. Ensure that there is no needless sleep in Task at the end of the task. Contributed by Siddharth Seth.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-0.23@1241283 13f79535-47bb-0310-9956-ffa450edef68
Arun Murthy 13 лет назад
Родитель
Сommit
c6356556be

+ 3 - 0
hadoop-mapreduce-project/CHANGES.txt

@@ -679,6 +679,9 @@ Release 0.23.1 - Unreleased
     MAPREDUCE-3354. Changed scripts so that jobhistory server is started by
     bin/mapred instead of bin/yarn. (Jonathan Eagles via acmurthy) 
 
+    MAPREDUCE-3809. Ensure that there is no needless sleep in Task at the end
+    of the task. (sseth via acmurthy)
+
 Release 0.23.0 - 2011-11-01 
 
   INCOMPATIBLE CHANGES

+ 14 - 8
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/Task.java

@@ -656,14 +656,13 @@ abstract public class Task implements Writable, Configurable {
         try {
           boolean taskFound = true; // whether TT knows about this task
           // sleep for a bit
-          try {
-            Thread.sleep(PROGRESS_INTERVAL);
-          } 
-          catch (InterruptedException e) {
-            if (LOG.isDebugEnabled()) {
-              LOG.debug(getTaskID() + " Progress/ping thread exiting " +
-                        "since it got interrupted");
+          synchronized(lock) {
+            if (taskDone.get()) {
+              break;
             }
+            lock.wait(PROGRESS_INTERVAL);
+          }
+          if (taskDone.get()) {
             break;
           }
 
@@ -721,7 +720,14 @@ abstract public class Task implements Writable, Configurable {
     }
     public void stopCommunicationThread() throws InterruptedException {
       if (pingThread != null) {
-        synchronized (lock) {
+        // Intent of the lock is to not send an interupt in the middle of an
+        // umbilical.ping or umbilical.statusUpdate
+        synchronized(lock) {
+        //Interrupt if sleeping. Otherwise wait for the RPC call to return.
+          lock.notify(); 
+        }
+
+        synchronized (lock) { 
           while (!done) {
             lock.wait();
           }