Browse Source

MAPREDUCE-4314. Synchronization in JvmManager. Contributed by Benoy Antony.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-0.22@1346268 13f79535-47bb-0310-9956-ffa450edef68
Konstantin Shvachko 13 years ago
parent
commit
eb761f5826

+ 2 - 0
mapreduce/CHANGES.txt

@@ -62,6 +62,8 @@ Release 0.22.1 - Unreleased
     MAPREDUCE-2376. test-task-controller fails if run as a userid < 1000.
     (Todd Lipcon and Benoy Antony via shv)
 
+    MAPREDUCE-4314. Synchronization in JvmManager. (Benoy Antony via shv)
+
 Release 0.22.0 - 2011-11-29
 
   INCOMPATIBLE CHANGES

+ 26 - 26
mapreduce/src/java/org/apache/hadoop/mapred/JvmManager.java

@@ -30,15 +30,10 @@ import java.util.Vector;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.fs.FileUtil;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.mapred.CleanupQueue.PathDeletionContext;
-import org.apache.hadoop.mapred.TaskController;
 import org.apache.hadoop.mapred.TaskController.DelayedProcessKiller;
+import org.apache.hadoop.mapred.TaskController.Signal;
 import org.apache.hadoop.mapred.TaskTracker.TaskInProgress;
 import org.apache.hadoop.mapreduce.TaskType;
-import org.apache.hadoop.mapreduce.server.tasktracker.TTConfig;
-import org.apache.hadoop.util.StringUtils;
 import static org.apache.hadoop.mapred.TaskController.Signal;
 
 class JvmManager {
@@ -182,9 +177,6 @@ static class JvmManagerForType {
   //Mapping from the JVM IDs to Reduce JVM processes
   Map <JVMId, JvmRunner> jvmIdToRunner = 
     new HashMap<JVMId, JvmRunner>();
-  //Mapping from the JVM IDs to process IDs
-  Map <JVMId, String> jvmIdToPid =
-    new HashMap<JVMId, String>();
   
   final int maxJvms;
   final boolean isMap;
@@ -211,7 +203,7 @@ static class JvmManagerForType {
       TaskRunner t) {
     jvmToRunningTask.put(jvmId, t);
     runningTaskToJvm.put(t,jvmId);
-    jvmIdToRunner.get(jvmId).setBusy(true);
+    jvmIdToRunner.get(jvmId).setTaskRunner(t);
   }
   
   synchronized public TaskInProgress getTaskForJvm(JVMId jvmId)
@@ -235,7 +227,7 @@ static class JvmManagerForType {
   synchronized String getPidByRunningTask(TaskRunner t) {
     JVMId id = runningTaskToJvm.get(t);
     if (id != null) {
-      return jvmIdToPid.get(id);
+      return jvmIdToRunner.get(id).getPid();
     }
     return null;
   }
@@ -243,7 +235,7 @@ static class JvmManagerForType {
   synchronized void setPidForJvm(JVMId jvmId, String pid) {
     JvmRunner runner = jvmIdToRunner.get(jvmId);
     assert runner != null : "Task must have a runner to set a pid";
-    jvmIdToPid.put(jvmId, pid);
+    runner.setPid(pid);
   }
   
   synchronized public boolean isJvmknown(JVMId jvmId) {
@@ -428,14 +420,17 @@ static class JvmManagerForType {
       volatile int numTasksRan;
       final int numTasksToRun;
       JVMId jvmId;
-      volatile boolean busy = true;
       private Task firstTask;
+      private Task task;
+      private String pidStr;
+
       
       public JvmRunner(JvmEnv env, JobID jobId, Task firstTask) {
         this.env = env;
         this.jvmId = new JVMId(jobId, isMap, rand.nextInt());
         this.numTasksToRun = env.conf.getNumTasksToExecutePerJvm();
         this.firstTask = firstTask;
+        this.task = firstTask;
         LOG.info("In JvmRunner constructed JVM ID: " + jvmId);
       }
 
@@ -461,9 +456,7 @@ static class JvmManagerForType {
         int exitCode = 0;
         try {
           env.vargs.add(Integer.toString(jvmId.getId()));
-          TaskRunner runner = jvmToRunningTask.get(jvmId);
-          if (runner != null) {
-            Task task = runner.getTask();
+          if (task != null) {
             //Launch the task controller to run task JVM
             String user = task.getUser();
             TaskAttemptID taskAttemptId = task.getTaskID();
@@ -489,6 +482,14 @@ static class JvmManagerForType {
           deleteWorkDir(tracker, firstTask);
         }
       }
+      
+     synchronized void setPid(String pid) {
+        this.pidStr = pid;
+     }
+    
+     synchronized String getPid() {
+       return pidStr;
+     }
 
       /** 
        * Kills the process. Also kills its subprocesses if the process(root of
@@ -499,16 +500,14 @@ static class JvmManagerForType {
           TaskController controller = tracker.getTaskController();
           // Check inital context before issuing a kill to prevent situations
           // where kill is issued before task is launched.
-          String pidStr = jvmIdToPid.get(jvmId);
-          if (pidStr != null) {
+          if (this.pidStr != null) {
             String user = env.conf.getUser();
-            int pid = Integer.parseInt(pidStr);
+            int pid = Integer.parseInt(this.pidStr);
             // start a thread that will kill the process dead
             if (sleeptimeBeforeSigkill > 0) {
-              controller.signalTask(user, pid, Signal.QUIT);
-              controller.signalTask(user, pid, Signal.TERM);
               new DelayedProcessKiller(user, pid, sleeptimeBeforeSigkill,
                   Signal.KILL, tracker.getTaskController()).start();
+              controller.signalTask(user, pid, Signal.TERM);
             } else {
               controller.signalTask(user, pid, Signal.KILL);
             }
@@ -520,19 +519,20 @@ static class JvmManagerForType {
         }
       }
 
-      public void taskRan() {
-        busy = false;
+      public synchronized void taskRan() {
+        task = null;
         numTasksRan++;
       }
       
       public boolean ranAll() {
         return(numTasksRan == numTasksToRun);
       }
-      public void setBusy(boolean busy) {
-        this.busy = busy;
+      
+      public synchronized void setTaskRunner(TaskRunner runner) {
+        task = runner.getTask();
       }
       public synchronized boolean isBusy() {
-        return busy;
+        return task!= null;
       }
     }
   }  

+ 3 - 5
mapreduce/src/test/mapred/org/apache/hadoop/mapred/TestJvmManager.java

@@ -235,8 +235,6 @@ public class TestJvmManager {
    */
   @Test
   public void testForRaces() throws Exception {
-    fail("TODO: re-enable test after 2178 merge");
-    /*
     JvmManagerForType mapJvmManager = jvmManager
         .getJvmManagerForType(TaskType.MAP);
 
@@ -256,8 +254,9 @@ public class TestJvmManager {
       Task task = new MapTask(null, attemptID, i, null, 1);
       task.setConf(taskConf);
       TaskInProgress tip = tt.new TaskInProgress(task, taskConf);
+      RunningJob rjob = new RunningJob(attemptID.getJobID());
       File pidFile = new File(TEST_DIR, "pid_" + i);
-      final TaskRunner taskRunner = task.createRunner(tt, tip);
+      final TaskRunner taskRunner = task.createRunner(tt, tip,rjob);
       // launch a jvm which sleeps for 60 seconds
       final Vector<String> vargs = new Vector<String>(2);
       vargs.add(writeScript("script_" + i, "echo hi\n", pidFile).getAbsolutePath());
@@ -271,7 +270,7 @@ public class TestJvmManager {
         public void run() {
           try {
             taskRunner.launchJvmAndWait(null, vargs, stdout, stderr, 100,
-                workDir, null);
+                workDir);
           } catch (Throwable t) {
             failed.compareAndSet(null, t);
             exec.shutdownNow();
@@ -287,7 +286,6 @@ public class TestJvmManager {
     if (failed.get() != null) {
       throw new RuntimeException(failed.get());
     }
-  */
   }
 
   /**