Przeglądaj źródła

MAPREDUCE-4164. Fix Communication exception thrown after task completion. Contributed by Mayank Bansal.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-0.22@1329486 13f79535-47bb-0310-9956-ffa450edef68
Konstantin Shvachko 13 lat temu
rodzic
commit
0037e81db4

+ 3 - 0
mapreduce/CHANGES.txt

@@ -23,6 +23,9 @@ Release 0.22.1 - Unreleased
     MAPREDUCE-3837. Job tracker is not able to recover jobs after crash.
     (Mayank Bansal via shv) 
 
+    MAPREDUCE-4164. Fix Communication exception thrown after task completion.
+    (Mayank Bansal via shv) 
+
 Release 0.22.0 - 2011-11-29
 
   INCOMPATIBLE CHANGES

+ 22 - 0
mapreduce/src/java/org/apache/hadoop/mapred/Task.java

@@ -536,6 +536,8 @@ abstract public class Task implements Writable, Configurable {
     private InputSplit split = null;
     private Progress taskProgress;
     private Thread pingThread = null;
+    private boolean done = true;
+    private Object lock = new Object();
 
     /**
      * flag that indicates whether progress update needs to be sent to parent.
@@ -627,6 +629,9 @@ abstract public class Task implements Writable, Configurable {
       // get current flag value and reset it as well
       boolean sendProgress = resetProgressFlag();
       while (!taskDone.get()) {
+    	  synchronized (lock) {
+          done = false;
+    	}
         try {
           boolean taskFound = true; // whether TT knows about this task
           // sleep for a bit
@@ -659,6 +664,7 @@ abstract public class Task implements Writable, Configurable {
           // came back up), kill ourselves
           if (!taskFound) {
             LOG.warn("Parent died.  Exiting "+taskId);
+            resetDoneFlag();
             System.exit(66);
           }
 
@@ -671,11 +677,22 @@ abstract public class Task implements Writable, Configurable {
           if (remainingRetries == 0) {
             ReflectionUtils.logThreadInfo(LOG, "Communication exception", 0);
             LOG.warn("Last retry, killing "+taskId);
+            resetDoneFlag();
             System.exit(65);
           }
         }
       }
+      //Notify that we are done with the work
+      resetDoneFlag();
     }
+    
+    void resetDoneFlag() {
+      synchronized (lock) {
+        done = true;
+        lock.notify();
+      }
+    }
+    
     public void startCommunicationThread() {
       if (pingThread == null) {
         pingThread = new Thread(this, "communication thread");
@@ -685,6 +702,11 @@ abstract public class Task implements Writable, Configurable {
     }
     public void stopCommunicationThread() throws InterruptedException {
       if (pingThread != null) {
+        synchronized (lock) {
+          while (!done) {
+            lock.wait();
+          }
+        }
         pingThread.interrupt();
         pingThread.join();
       }