Browse Source

Fix tasktracker to exit when errors are encountered reading map output, in order to force re-execution of map tasks. It's overkill, since it will re-compute all map output computed by that task tracker, not just that which could not be read, but this should be a rare situation. If we start seeing it frequently, then we could optimize this by adding a way to tell the jobtracker that a particular previously completed map task now needs to be re-executed.

git-svn-id: https://svn.apache.org/repos/asf/lucene/hadoop/trunk@385856 13f79535-47bb-0310-9956-ffa450edef68
Doug Cutting 19 years ago
parent
commit
74238987ec

+ 20 - 3
src/java/org/apache/hadoop/mapred/MapOutputFile.java

@@ -17,6 +17,7 @@
 package org.apache.hadoop.mapred;
 
 import java.io.IOException;
+import java.util.logging.Level;
 
 import java.io.*;
 import org.apache.hadoop.io.*;
@@ -108,12 +109,28 @@ class MapOutputFile implements Writable, Configurable {
     // write the length-prefixed file content to the wire
     File file = getOutputFile(mapTaskId, partition);
     out.writeLong(file.length());
-    FSDataInputStream in = FileSystem.getNamed("local", this.jobConf).open(file);
+
+    FSDataInputStream in = null;
+    try {
+      in = FileSystem.getNamed("local", this.jobConf).open(file);
+    } catch (IOException e) {
+      // log a SEVERE exception in order to cause TaskTracker to exit
+      TaskTracker.LOG.log(Level.SEVERE, "Can't open map output:" + file, e);
+      throw e;
+    }
     try {
       byte[] buffer = new byte[8192];
-      int l;
-      while ((l = in.read(buffer)) != -1) {
+      int l  = 0;
+      
+      while (l != -1) {
         out.write(buffer, 0, l);
+        try {
+          l = in.read(buffer);
+        } catch (IOException e) {
+          // log a SEVERE exception in order to cause TaskTracker to exit
+          TaskTracker.LOG.log(Level.SEVERE,"Can't read map output:" + file, e);
+          throw e;
+        }
       }
     } finally {
       in.close();

+ 5 - 0
src/java/org/apache/hadoop/mapred/TaskTracker.java

@@ -305,6 +305,11 @@ public class TaskTracker implements MRConstants, TaskUmbilicalProtocol, MapOutpu
               }
             }
             lastHeartbeat = now;
+
+            if (LogFormatter.hasLoggedSevere()) {
+              LOG.info("Severe problem detected.  TaskTracker exiting.");
+              return STALE_STATE;
+            }
         }
 
         return 0;