Browse Source

HADOOP-1332. Fix so that TaskTracker exits reliably during unit tests on Windows. Contributed by Owen.

git-svn-id: https://svn.apache.org/repos/asf/lucene/hadoop/trunk@543222 13f79535-47bb-0310-9956-ffa450edef68
Doug Cutting 18 years ago
parent
commit
b1b1706727
2 changed files with 69 additions and 27 deletions
  1. 3 0
      CHANGES.txt
  2. 66 27
      src/java/org/apache/hadoop/mapred/TaskRunner.java

+ 3 - 0
CHANGES.txt

@@ -508,6 +508,9 @@ Branch 0.13 (unreleased changes)
 129. HADOOP-1242.  Improve handling of DFS upgrades.
 129. HADOOP-1242.  Improve handling of DFS upgrades.
      (Konstantin Shvachko via cutting)
      (Konstantin Shvachko via cutting)
 
 
+130. HADOOP-1332.  Fix so that TaskTracker exits reliably during unit
+     tests on Windows.  (omalley via cutting)
+
 
 
 Release 0.12.3 - 2007-04-06
 Release 0.12.3 - 2007-04-06
 
 

+ 66 - 27
src/java/org/apache/hadoop/mapred/TaskRunner.java

@@ -389,16 +389,19 @@ abstract class TaskRunner extends Thread {
    */
    */
   private void runChild(String[] args, File dir) throws IOException {
   private void runChild(String[] args, File dir) throws IOException {
     this.process = Runtime.getRuntime().exec(args, null, dir);
     this.process = Runtime.getRuntime().exec(args, null, dir);
+    
+    Thread logStdErrThread = null;
+    Thread logStdOutThread = null;
     try {
     try {
-      new Thread() {
-        public void run() {
-          // Copy stderr of the process
-          logStream(process.getErrorStream(), taskStdErrLogWriter); 
-        }
-      }.start();
-        
-      // Copy stderr of the process; normally empty
-      logStream(process.getInputStream(), taskStdOutLogWriter);		  
+      // Copy stderr of the child-process via a thread
+      logStdErrThread = logStream((t.getTaskId() + " - " + "stderr"), 
+                                   process.getErrorStream(), 
+                                   taskStdErrLogWriter);
+      
+      // Copy stdout of the child-process via a thread
+      logStdOutThread = logStream((t.getTaskId() + " - " + "stdout"), 
+                                  process.getInputStream(), 
+                                  taskStdOutLogWriter); 
       
       
       int exit_code = process.waitFor();
       int exit_code = process.waitFor();
      
      
@@ -411,8 +414,21 @@ abstract class TaskRunner extends Thread {
       throw new IOException(e.toString());
       throw new IOException(e.toString());
     } finally {
     } finally {
       kill();
       kill();
-      taskStdOutLogWriter.close();
-      taskStdErrLogWriter.close();
+      
+      // Kill both stdout/stderr copying threads 
+      if (logStdErrThread != null) {
+        logStdErrThread.interrupt();
+        try {
+          logStdErrThread.join();
+        } catch (InterruptedException ie) {}
+      }
+      
+      if (logStdOutThread != null) {
+        logStdOutThread.interrupt();
+        try {
+          logStdOutThread.join();
+        } catch (InterruptedException ie) {}
+      }
     }
     }
   }
   }
 
 
@@ -427,24 +443,47 @@ abstract class TaskRunner extends Thread {
   }
   }
 
 
   /**
   /**
+   * Spawn a new thread to copy the child-jvm's stdout/stderr streams
+   * via a {@link TaskLog.Writer}
+   * 
+   * @param threadName thread name
+   * @param stream child-jvm's stdout/stderr stream
+   * @param writer {@link TaskLog.Writer} used to copy the child-jvm's data
+   * @return Return the newly created thread
    */
    */
-  private void logStream(InputStream output, TaskLog.Writer taskLog) {
-    try {
-      byte[] buf = new byte[512];
-      int n = 0;
-      while ((n = output.read(buf, 0, buf.length)) != -1) {
-        // Write out to the task's log
-        taskLog.write(buf, 0, n);
-      }
-    } catch (IOException e) {
-      LOG.warn(t.getTaskId()+" Error reading child output", e);
-    } finally {
-      try {
-        output.close();
-      } catch (IOException e) {
-        LOG.warn(t.getTaskId()+" Error closing child output", e);
+  private Thread logStream(String threadName, 
+                           final InputStream stream, 
+                           final TaskLog.Writer taskLog) {
+    Thread loggerThread = new Thread() {
+      public void run() {
+        try {
+          byte[] buf = new byte[512];
+          while (!Thread.interrupted()) {
+            while (stream.available() > 0) {
+              int n = stream.read(buf, 0, buf.length);
+              taskLog.write(buf, 0, n);
+            }
+            Thread.sleep(1000);
+          }
+        } catch (IOException e) {
+          LOG.warn(t.getTaskId()+" Error reading child output", e);
+        } catch (InterruptedException e) {
+          // expected
+        } finally {
+          try {
+            stream.close();
+            taskLog.close();
+          } catch (IOException e) {
+            LOG.warn(t.getTaskId()+" Error closing child output", e);
+          }
+        }
       }
       }
-    }
+    };
+    loggerThread.setName(threadName);
+    loggerThread.setDaemon(true);
+    loggerThread.start();
+    
+    return loggerThread;
   }
   }
   
   
 }
 }