فهرست منبع

Merge -c 1477897 from branch-1 to branch-1.2 to fix MAPREDUCE-5198. Fix a race condition during TaskTracker re-init which was causing failures since task directories were being cleaned up in multiple threads. Contributed by Arpit Gupta.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-1.2@1477898 13f79535-47bb-0310-9956-ffa450edef68
Arun Murthy 12 سال پیش
والد
کامیت
28f05a68c4
3فایلهای تغییر یافته به همراه58 افزوده شده و 30 حذف شده
  1. 4 0
      CHANGES.txt
  2. 39 20
      src/mapred/org/apache/hadoop/mapred/LinuxTaskController.java
  3. 15 10
      src/mapred/org/apache/hadoop/mapred/TaskTracker.java

+ 4 - 0
CHANGES.txt

@@ -601,6 +601,10 @@ Release 1.2.0 - 2013.04.16
     false to indicate they don't want to be recovered. (Mayank Bansal via
     acmurthy) 
 
+    MAPREDUCE-5198. Fix a race condition during TaskTracker re-init which was
+    causing failures since task directories were being cleaned up in multiple
+    threads. (arpit via acmurthy)
+
 Release 1.1.2 - 2013.01.30
 
   INCOMPATIBLE CHANGES

+ 39 - 20
src/mapred/org/apache/hadoop/mapred/LinuxTaskController.java

@@ -268,32 +268,51 @@ class LinuxTaskController extends TaskController {
 
   @Override
   public void deleteAsUser(String user, String subDir) throws IOException {
-    String[] command = 
-      new String[]{taskControllerExe, 
-                   user,
-                   localStorage.getDirsString(),
-                   Integer.toString(Commands.DELETE_AS_USER.getValue()),
-                   subDir};
-    ShellCommandExecutor shExec = new ShellCommandExecutor(command);
-    if (LOG.isDebugEnabled()) {
-      LOG.debug("deleteAsUser: " + Arrays.toString(command));
+    String[] command = new String[] { taskControllerExe, user,
+        localStorage.getDirsString(),
+        Integer.toString(Commands.DELETE_AS_USER.getValue()), subDir };
+    ShellCommandExecutor shExec = null;
+    try {
+      shExec = new ShellCommandExecutor(command);
+
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("deleteAsUser: " + Arrays.toString(command));
+      }
+      shExec.execute();
+    } catch (IOException e) {
+      if (shExec != null) {
+        int exitCode = shExec.getExitCode();
+        LOG.info("deleteAsUser: " + Arrays.toString(command));
+        LOG.warn("Exit code is : " + exitCode);
+        LOG.info("Output from deleteAsUser LinuxTaskController:");
+        logOutput(shExec.getOutput());
+      }
+      throw e;
     }
-    shExec.execute();
   }
 
   @Override
   public void deleteLogAsUser(String user, String subDir) throws IOException {
-    String[] command = 
-      new String[]{taskControllerExe, 
-                   user,
-                   localStorage.getDirsString(),
-                   Integer.toString(Commands.DELETE_LOG_AS_USER.getValue()),
-                   subDir};
-    ShellCommandExecutor shExec = new ShellCommandExecutor(command);
-    if (LOG.isDebugEnabled()) {
-      LOG.debug("deleteLogAsUser: " + Arrays.toString(command));
+    String[] command = new String[] { taskControllerExe, user,
+        localStorage.getDirsString(),
+        Integer.toString(Commands.DELETE_LOG_AS_USER.getValue()), subDir };
+    ShellCommandExecutor shExec = null;
+    try {
+      shExec = new ShellCommandExecutor(command);
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("deleteLogAsUser: " + Arrays.toString(command));
+      }
+      shExec.execute();
+    } catch (IOException e) {
+      if (shExec != null) {
+        int exitCode = shExec.getExitCode();
+        LOG.info("deleteLogAsUser: " + Arrays.toString(command));
+        LOG.warn("Exit code is : " + exitCode);
+        LOG.info("Output from deleteLogAsUser LinuxTaskController:");
+        logOutput(shExec.getOutput());
+      }
+      throw e;
     }
-    shExec.execute();
   }
 
   @Override

+ 15 - 10
src/mapred/org/apache/hadoop/mapred/TaskTracker.java

@@ -1458,7 +1458,7 @@ public class TaskTracker implements MRConstants, TaskUmbilicalProtocol,
       new TreeMap<TaskAttemptID, TaskInProgress>();
     tasksToClose.putAll(tasks);
     for (TaskInProgress tip : tasksToClose.values()) {
-      tip.jobHasFinished(false);
+      tip.jobHasFinished(true, false);
     }
     
     this.running = false;
@@ -2239,7 +2239,7 @@ public class TaskTracker implements MRConstants, TaskUmbilicalProtocol,
         rjob.distCacheMgr.release();
         // Add this tips of this job to queue of tasks to be purged 
         for (TaskInProgress tip : rjob.tasks) {
-          tip.jobHasFinished(false);
+          tip.jobHasFinished(false, false);
           Task t = tip.getTask();
           if (t.isMapTask()) {
             indexCache.removeMap(tip.getTask().getTaskID().toString());
@@ -2313,7 +2313,7 @@ public class TaskTracker implements MRConstants, TaskUmbilicalProtocol,
       // Remove the task from running jobs, 
       // removing the job if it's the last task
       removeTaskFromJob(tip.getTask().getJobID(), tip);
-      tip.jobHasFinished(wasFailure);
+      tip.jobHasFinished(false, wasFailure);
       if (tip.getTask().isMapTask()) {
         indexCache.removeMap(tip.getTask().getTaskID().toString());
       }
@@ -2611,7 +2611,7 @@ public class TaskTracker implements MRConstants, TaskUmbilicalProtocol,
           tip.reportDiagnosticInfo(msg);
           try {
             tip.kill(true);
-            tip.cleanup(true);
+            tip.cleanup(false, true);
           } catch (IOException ie2) {
             LOG.info("Error cleaning up " + tip.getTask().getTaskID(), ie2);
           } catch (InterruptedException ie2) {
@@ -3182,7 +3182,7 @@ public class TaskTracker implements MRConstants, TaskUmbilicalProtocol,
         removeTaskFromJob(task.getJobID(), this);
       }
       try {
-        cleanup(needCleanup);
+        cleanup(false, needCleanup);
       } catch (IOException ie) {
       }
 
@@ -3270,11 +3270,13 @@ public class TaskTracker implements MRConstants, TaskUmbilicalProtocol,
     /**
      * We no longer need anything from this task, as the job has
      * finished.  If the task is still running, kill it and clean up.
-     * 
+     *
+     * @param ttReInit is the TaskTracker executing re-initialization sequence?
      * @param wasFailure did the task fail, as opposed to was it killed by
      *                   the framework
      */
-    public void jobHasFinished(boolean wasFailure) throws IOException {
+    public void jobHasFinished(boolean ttReInit, boolean wasFailure) 
+        throws IOException {
       // Kill the task if it is still running
       synchronized(this){
         if (getRunState() == TaskStatus.State.RUNNING ||
@@ -3291,7 +3293,7 @@ public class TaskTracker implements MRConstants, TaskUmbilicalProtocol,
       }
       
       // Cleanup on the finished task
-      cleanup(true);
+      cleanup(ttReInit, true);
     }
 
     /**
@@ -3373,7 +3375,7 @@ public class TaskTracker implements MRConstants, TaskUmbilicalProtocol,
      * otherwise the current working directory of the task 
      * i.e. &lt;taskid&gt;/work is cleaned up.
      */
-    void cleanup(boolean needCleanup) throws IOException {
+    void cleanup(boolean ttReInit, boolean needCleanup) throws IOException {
       TaskAttemptID taskId = task.getTaskID();
       LOG.debug("Cleaning up " + taskId);
 
@@ -3402,7 +3404,10 @@ public class TaskTracker implements MRConstants, TaskUmbilicalProtocol,
           return;
         }
         try {
-          removeTaskFiles(needCleanup);
+          // TT re-initialization sequence: no need to cleanup, TT will cleanup
+          if (!ttReInit) {
+            removeTaskFiles(needCleanup);
+          }
         } catch (Throwable ie) {
           LOG.info("Error cleaning up task runner: "
               + StringUtils.stringifyException(ie));