浏览代码

HADOOP-1324. Change so that an FSError kills only the task that generates it rather than the entire task tracker. Contributed by Arun.

git-svn-id: https://svn.apache.org/repos/asf/lucene/hadoop/trunk@536000 13f79535-47bb-0310-9956-ffa450edef68
Doug Cutting 18 年之前
父节点
当前提交
f12c972aa4

+ 4 - 0
CHANGES.txt

@@ -356,6 +356,10 @@ Trunk (unreleased changes)
      More care is also taken to not allocate files on full or offline
      More care is also taken to not allocate files on full or offline
      drives.  (Devaraj Das via cutting)
      drives.  (Devaraj Das via cutting)
 
 
+106. HADOOP-1324.  Change so that an FSError kills only the task that
+     generates it rather than the entire task tracker.
+     (Arun C Murthy via cutting)
+
 
 
 Release 0.12.3 - 2007-04-06
 Release 0.12.3 - 2007-04-06
 
 

+ 2 - 2
src/java/org/apache/hadoop/mapred/IsolationRunner.java

@@ -47,8 +47,8 @@ public class IsolationRunner {
       LOG.info("Task " + taskid + " reporting done.");
       LOG.info("Task " + taskid + " reporting done.");
     }
     }
 
 
-    public void fsError(String message) throws IOException {
-      LOG.info("Task reporting file system error: " + message);
+    public void fsError(String taskId, String message) throws IOException {
+      LOG.info("Task " + taskId + " reporting file system error: " + message);
     }
     }
 
 
     public Task getTask(String taskid) throws IOException {
     public Task getTask(String taskid) throws IOException {

+ 3 - 2
src/java/org/apache/hadoop/mapred/LocalJobRunner.java

@@ -236,8 +236,9 @@ class LocalJobRunner implements JobSubmissionProtocol {
       }
       }
     }
     }
 
 
-    public synchronized void fsError(String message) throws IOException {
-      LOG.fatal("FSError: "+ message);
+    public synchronized void fsError(String taskId, String message) 
+    throws IOException {
+      LOG.fatal("FSError: "+ message + "from task: " + taskId);
     }
     }
 
 
     public TaskCompletionEvent[] getMapCompletionEvents(
     public TaskCompletionEvent[] getMapCompletionEvents(

+ 1 - 1
src/java/org/apache/hadoop/mapred/TaskRunner.java

@@ -289,7 +289,7 @@ abstract class TaskRunner extends Thread {
     } catch (FSError e) {
     } catch (FSError e) {
       LOG.fatal("FSError", e);
       LOG.fatal("FSError", e);
       try {
       try {
-        tracker.fsError(e.getMessage());
+        tracker.fsError(t.getTaskId(), e.getMessage());
       } catch (IOException ie) {
       } catch (IOException ie) {
         LOG.fatal(t.getTaskId()+" reporting FSError", ie);
         LOG.fatal(t.getTaskId()+" reporting FSError", ie);
       }
       }

+ 10 - 6
src/java/org/apache/hadoop/mapred/TaskTracker.java

@@ -1577,11 +1577,15 @@ public class TaskTracker
     }
     }
   }
   }
 
 
-  /** A child task had a local filesystem error.  Exit, so that no future
-   * jobs are accepted. */
-  public synchronized void fsError(String message) throws IOException {
-    LOG.fatal("FSError, exiting: "+ message);
-    running = false;
+  /** 
+   * A child task had a local filesystem error. Kill the task.
+   */  
+  public synchronized void fsError(String taskId, String message) 
+  throws IOException {
+    LOG.fatal("Task: " + taskId + " - Killed due to FSError: " + message);
+    TaskInProgress tip = runningTasks.get(taskId);
+    tip.reportDiagnosticInfo("FSError: " + message);
+    purgeTask(tip);
   }
   }
 
 
   public TaskCompletionEvent[] getMapCompletionEvents(
   public TaskCompletionEvent[] getMapCompletionEvents(
@@ -1705,7 +1709,7 @@ public class TaskTracker
         task.run(job, umbilical);             // run the task
         task.run(job, umbilical);             // run the task
       } catch (FSError e) {
       } catch (FSError e) {
         LOG.fatal("FSError from child", e);
         LOG.fatal("FSError from child", e);
-        umbilical.fsError(e.getMessage());
+        umbilical.fsError(taskid, e.getMessage());
       } catch (Throwable throwable) {
       } catch (Throwable throwable) {
         LOG.warn("Error running child", throwable);
         LOG.warn("Error running child", throwable);
         // Report back any failures, for diagnostic purposes
         // Report back any failures, for diagnostic purposes

+ 1 - 1
src/java/org/apache/hadoop/mapred/TaskUmbilicalProtocol.java

@@ -63,7 +63,7 @@ interface TaskUmbilicalProtocol extends VersionedProtocol {
   void done(String taskid) throws IOException;
   void done(String taskid) throws IOException;
 
 
   /** Report that the task encounted a local filesystem error.*/
   /** Report that the task encounted a local filesystem error.*/
-  void fsError(String message) throws IOException;
+  void fsError(String taskId, String message) throws IOException;
 
 
   /** Called by a reduce task to get the map output locations for finished maps.
   /** Called by a reduce task to get the map output locations for finished maps.
    *
    *