浏览代码

HADOOP-3155. Ensure that there is only one thread fetching TaskCompletionEvents on TaskTracker re-init. Contributed by Dhruba Borthakur.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/core/trunk@705391 13f79535-47bb-0310-9956-ffa450edef68
Arun Murthy 16 年之前
父节点
当前提交
0a1638ebeb
共有 2 个文件被更改,包括 18 次插入2 次删除
  1. 4 0
      CHANGES.txt
  2. 14 2
      src/mapred/org/apache/hadoop/mapred/TaskTracker.java

+ 4 - 0
CHANGES.txt

@@ -943,6 +943,10 @@ Release 0.19.0 - Unreleased
     HADOOP-4418. Updates documentation in forrest for Mapred, streaming and pipes.
     (Amareshwari Sriramadasu via ddas)
 
+    HADOOP-3155. Ensure that there is only one thread fetching 
+    TaskCompletionEvents on TaskTracker re-init. (Dhruba Borthakur via
+    acmurthy) 
+
 Release 0.18.2 - Unreleased
 
   BUG FIXES

+ 14 - 2
src/mapred/org/apache/hadoop/mapred/TaskTracker.java

@@ -118,7 +118,7 @@ public class TaskTracker
   public static final Log ClientTraceLog =
     LogFactory.getLog(TaskTracker.class.getName() + ".clienttrace");
 
-  private boolean running = true;
+  volatile boolean running = true;
 
   private LocalDirAllocator localDirAllocator;
   String taskTrackerName;
@@ -553,7 +553,7 @@ public class TaskTracker
     public void run() {
       LOG.info("Starting thread: " + this.getName());
         
-      while (true) {
+      while (running) {
         try {
           List <FetchStatus> fList = null;
           synchronized (runningJobs) {
@@ -584,6 +584,9 @@ public class TaskTracker
                        " events threw for " + f.jobId + " threw: " +
                        StringUtils.stringifyException(e)); 
             }
+            if (!running) {
+              break;
+            }
           }
           synchronized (waitingOn) {
             try {
@@ -867,6 +870,15 @@ public class TaskTracker
     
     // shutdown RPC connections
     RPC.stopProxy(jobClient);
+
+    // wait for the fetcher thread to exit
+    for (boolean done = false; !done; ) {
+      try {
+        this.mapEventsFetcher.join();
+        done = true;
+      } catch (InterruptedException e) {
+      }
+    }
   }
 
   /**