Forráskód Böngészése

commit 86e9c728e5f669b4337798d83cfba829b314cc36
Author: Rajesh Balamohan <rbala@yahoo-inc.com>
Date: Thu Feb 3 07:38:01 2011 +0530

- Runtime variations in PigMix scripts due to timeouts in maptasks


git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-0.20-security-patches@1077782 13f79535-47bb-0310-9956-ffa450edef68

Owen O'Malley 14 éve
szülő
commit
e9b32bc071
1 módosított fájl, 22 hozzáadás és 0 törlés
  1. 22 0
      src/mapred/org/apache/hadoop/mapred/Task.java

+ 22 - 0
src/mapred/org/apache/hadoop/mapred/Task.java

@@ -511,6 +511,8 @@ abstract public class Task implements Writable, Configurable {
     private Progress taskProgress;
     private Thread pingThread = null;
     private static final int PROGRESS_STATUS_LEN_LIMIT = 512;
+    private boolean done = true;
+    private Object lock = new Object();
     
     /**
      * flag that indicates whether progress update needs to be sent to parent.
@@ -605,6 +607,9 @@ abstract public class Task implements Writable, Configurable {
       // get current flag value and reset it as well
       boolean sendProgress = resetProgressFlag();
       while (!taskDone.get()) {
+        synchronized(lock) {
+          done = false;
+        }
         try {
           boolean taskFound = true; // whether TT knows about this task
           // sleep for a bit
@@ -637,6 +642,7 @@ abstract public class Task implements Writable, Configurable {
           // came back up), kill ourselves
           if (!taskFound) {
             LOG.warn("Parent died.  Exiting "+taskId);
+            resetDoneFlag();
             System.exit(66);
           }
 
@@ -649,11 +655,22 @@ abstract public class Task implements Writable, Configurable {
           if (remainingRetries == 0) {
             ReflectionUtils.logThreadInfo(LOG, "Communication exception", 0);
             LOG.warn("Last retry, killing "+taskId);
+            resetDoneFlag();
             System.exit(65);
           }
         }
       }
+      //Notify that we are done with the work
+      resetDoneFlag();
+    }
+
+    void resetDoneFlag() {
+      synchronized(lock) {
+        done = true;
+        lock.notify();
+      }
     }
+
     public void startCommunicationThread() {
       if (pingThread == null) {
         pingThread = new Thread(this, "communication thread");
@@ -663,6 +680,11 @@ abstract public class Task implements Writable, Configurable {
     }
     public void stopCommunicationThread() throws InterruptedException {
       if (pingThread != null) {
+        synchronized(lock) {
+          while(!done) {
+            lock.wait();
+          }
+        }
         pingThread.interrupt();
         pingThread.join();
       }