浏览代码

Merge -r 712939:712940 from trunk onto 0.19 branch. Fixes HADOOP-4595.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/core/branches/branch-0.19@712941 13f79535-47bb-0310-9956-ffa450edef68
Devaraj Das 16 年之前
父节点
当前提交
56a9a915ad

+ 3 - 0
CHANGES.txt

@@ -968,6 +968,9 @@ Release 0.19.0 - Unreleased
     HADOOP-4282. Some user facing URLs are not filtered by user filters.
     (szetszwo)
 
+    HADOOP-4595. Fixes two race conditions - one to do with updating free slot count,
+    and another to do with starting the MapEventsFetcher thread. (ddas)
+
 Release 0.18.3 - Unreleased
 
   BUG FIXES

+ 9 - 1
src/mapred/org/apache/hadoop/mapred/JvmManager.java

@@ -20,6 +20,7 @@ package org.apache.hadoop.mapred;
 
 import java.io.File;
 import java.io.IOException;
+import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.Iterator;
 import java.util.List;
@@ -177,7 +178,14 @@ class JvmManager {
     }
     
     synchronized public void stop() {
-      for (JvmRunner jvm : jvmIdToRunner.values()) {
+      //since the kill() method invoked later on would remove
+      //an entry from the jvmIdToRunner map, we create a
+      //copy of the values and iterate over it (if we don't
+      //make a copy, we will encounter concurrentModification
+      //exception
+      List <JvmRunner> list = new ArrayList<JvmRunner>();
+      list.addAll(jvmIdToRunner.values());
+      for (JvmRunner jvm : list) {
         jvm.kill();
       }
     }

+ 0 - 5
src/mapred/org/apache/hadoop/mapred/TaskRunner.java

@@ -459,11 +459,6 @@ abstract class TaskRunner extends Thread {
         LOG.warn("Error releasing caches : Cache files might not have been cleaned up");
       }
       tracker.reportTaskFinished(t.getTaskID(), false);
-      if (t.isMapTask()) {
-        tracker.addFreeMapSlot();
-      } else {
-        tracker.addFreeReduceSlot();
-      }
     }
   }
   

+ 28 - 39
src/mapred/org/apache/hadoop/mapred/TaskTracker.java

@@ -461,7 +461,7 @@ public class TaskTracker
       RPC.waitForProxy(InterTrackerProtocol.class,
                        InterTrackerProtocol.versionID, 
                        jobTrackAddr, this.fConf);
-        
+    this.running = true;    
     // start the thread that will fetch map task completion events
     this.mapEventsFetcher = new MapEventsFetcherThread();
     mapEventsFetcher.setDaemon(true);
@@ -484,7 +484,6 @@ public class TaskTracker
     reduceLauncher = new TaskLauncher(maxCurrentReduceTasks);
     mapLauncher.start();
     reduceLauncher.start();
-    this.running = true;
   }
   
   public static Class<? extends TaskTrackerInstrumentation> getInstrumentationClass(Configuration conf) {
@@ -781,20 +780,8 @@ public class TaskTracker
 
   private void launchTaskForJob(TaskInProgress tip, JobConf jobConf) throws IOException{
     synchronized (tip) {
-      try {
-        tip.setJobConf(jobConf);
-        tip.launchTask();
-      } catch (Throwable ie) {
-        tip.taskStatus.setRunState(TaskStatus.State.FAILED);
-        try {
-          tip.cleanup(true);
-        } catch (Throwable ie2) {
-          // Ignore it, we are just trying to cleanup.
-        }
-        String error = StringUtils.stringifyException(ie);
-        tip.reportDiagnosticInfo(error);
-        LOG.info(error);
-      }
+      tip.setJobConf(jobConf);
+      tip.launchTask();
     }
   }
     
@@ -851,8 +838,6 @@ public class TaskTracker
     this.mapEventsFetcher.interrupt();
     
     //stop the launchers
-    mapLauncher.cleanTaskQueue();
-    reduceLauncher.cleanTaskQueue();
     this.mapLauncher.interrupt();
     this.reduceLauncher.interrupt();
     
@@ -1539,14 +1524,6 @@ public class TaskTracker
   public JvmManager getJvmManagerInstance() {
     return jvmManager;
   }
-
-  public void addFreeMapSlot() {
-    mapLauncher.addFreeSlot();
-  }
-  
-  public void addFreeReduceSlot() {
-    reduceLauncher.addFreeSlot();
-  }
   
   private void addToTaskQueue(LaunchTaskAction action) {
     if (action.getTask().isMapTask()) {
@@ -1571,7 +1548,7 @@ public class TaskTracker
 
     public void addToTaskQueue(LaunchTaskAction action) {
       synchronized (tasksToLaunch) {
-        TaskInProgress tip = registerTask(action);
+        TaskInProgress tip = registerTask(action, this);
         tasksToLaunch.add(tip);
         tasksToLaunch.notifyAll();
       }
@@ -1632,10 +1609,11 @@ public class TaskTracker
       }
     }
   }
-  private TaskInProgress registerTask(LaunchTaskAction action) {
+  private TaskInProgress registerTask(LaunchTaskAction action, 
+      TaskLauncher launcher) {
     Task t = action.getTask();
     LOG.info("LaunchTaskAction (registerTask): " + t.getTaskID());
-    TaskInProgress tip = new TaskInProgress(t, this.fConf);
+    TaskInProgress tip = new TaskInProgress(t, this.fConf, launcher);
     synchronized (this) {
       tasks.put(t.getTaskID(), tip);
       runningTasks.put(t.getTaskID(), tip);
@@ -1667,6 +1645,7 @@ public class TaskTracker
       tip.reportDiagnosticInfo(msg);
       try {
         tip.kill(true);
+        tip.cleanup(true);
       } catch (IOException ie2) {
         LOG.info("Error cleaning up " + tip.getTask().getTaskID() + ":\n" +
                  StringUtils.stringifyException(ie2));          
@@ -1750,11 +1729,17 @@ public class TaskTracker
     private long taskTimeout;
     private String debugCommand;
     private volatile boolean slotTaken = false;
+    private TaskLauncher launcher;
         
     /**
      */
     public TaskInProgress(Task task, JobConf conf) {
+      this(task, conf, null);
+    }
+    
+    public TaskInProgress(Task task, JobConf conf, TaskLauncher launcher) {
       this.task = task;
+      this.launcher = launcher;
       this.lastProgressReport = System.currentTimeMillis();
       this.defaultJobConf = conf;
       localJobConf = null;
@@ -2234,13 +2219,16 @@ public class TaskTracker
         } else {
           taskStatus.setRunState(TaskStatus.State.KILLED);
         }
-        if (slotTaken) {
-          if (task.isMapTask()) {
-            addFreeMapSlot();
-          } else {
-            addFreeReduceSlot();
-          }
+      }
+      releaseSlot();
+    }
+    
+    private synchronized void releaseSlot() {
+      if (slotTaken) {
+        if (launcher != null) {
+          launcher.addFreeSlot();
         }
+        slotTaken = false;
       }
     }
 
@@ -2518,6 +2506,11 @@ public class TaskTracker
     if (tip != null) {
       if (!commitPending) {
         tip.taskFinished();
+        // Remove the entry from taskMemoryManagerThread's data structures.
+        if (isTaskMemoryManagerEnabled()) {
+          taskMemoryManager.removeTask(taskid);
+        }
+        tip.releaseSlot();
       }
       synchronized(finishedCount) {
         finishedCount[0]++;
@@ -2526,10 +2519,6 @@ public class TaskTracker
     } else {
       LOG.warn("Unknown child task finshed: "+taskid+". Ignored.");
     }
-    // Remove the entry from taskMemoryManagerThread's data structures.
-    if (isTaskMemoryManagerEnabled()) {
-      taskMemoryManager.removeTask(taskid);
-    }
   }