Pārlūkot izejas kodu

HADOOP-597. Fix TaskTracker to not discard map outputs for errors in transmission to reduce node. Contributed by Owen.

git-svn-id: https://svn.apache.org/repos/asf/lucene/hadoop/trunk@462918 13f79535-47bb-0310-9956-ffa450edef68
Doug Cutting 18 gadi atpakaļ
vecāks
revīzija
a8135bb91d
2 mainītis faili ar 18 papildinājumiem un 6 dzēšanām
  1. 3 0
      CHANGES.txt
  2. 15 6
      src/java/org/apache/hadoop/mapred/TaskTracker.java

+ 3 - 0
CHANGES.txt

@@ -16,6 +16,9 @@ Release 0.7.1 - unreleased
  4. HADOOP-598.  Fix tasks to retry when reporting completion, so that
  4. HADOOP-598.  Fix tasks to retry when reporting completion, so that
     a single RPC timeout won't fail a task.  (omalley via cutting)
     a single RPC timeout won't fail a task.  (omalley via cutting)
 
 
+ 5. HADOOP-597.  Fix TaskTracker to not discard map outputs for errors
+    in transmitting them to reduce nodes.  (omalley via cutting)
+
 
 
 Release 0.7.0 - 2006-10-06
 Release 0.7.0 - 2006-10-06
 
 

+ 15 - 6
src/java/org/apache/hadoop/mapred/TaskTracker.java

@@ -1365,30 +1365,39 @@ public class TaskTracker
         Path filename = conf.getLocalPath(mapId+"/part-"+reduce+".out");
         Path filename = conf.getLocalPath(mapId+"/part-"+reduce+".out");
         response.setContentLength((int) fileSys.getLength(filename));
         response.setContentLength((int) fileSys.getLength(filename));
         InputStream inStream = null;
         InputStream inStream = null;
+        // true iff IOException was caused by attempt to access input
+        boolean isInputException = true;
         try {
         try {
           inStream = fileSys.open(filename);
           inStream = fileSys.open(filename);
           try {
           try {
             int len = inStream.read(buffer);
             int len = inStream.read(buffer);
             while (len > 0) {
             while (len > 0) {
-              outStream.write(buffer, 0, len);
+              try {
+                outStream.write(buffer, 0, len);
+              } catch (IOException ie) {
+                isInputException = false;
+                throw ie;
+              }
               len = inStream.read(buffer);
               len = inStream.read(buffer);
             }
             }
           } finally {
           } finally {
             inStream.close();
             inStream.close();
-            outStream.close();
           }
           }
         } catch (IOException ie) {
         } catch (IOException ie) {
           TaskTracker tracker = 
           TaskTracker tracker = 
             (TaskTracker) context.getAttribute("task.tracker");
             (TaskTracker) context.getAttribute("task.tracker");
           Log log = (Log) context.getAttribute("log");
           Log log = (Log) context.getAttribute("log");
-          String errorMsg = "getMapOutput(" + mapId + "," + reduceId + 
-          ") failed :\n"+
-          StringUtils.stringifyException(ie);
+          String errorMsg = ("getMapOutput(" + mapId + "," + reduceId + 
+                             ") failed :\n"+
+                             StringUtils.stringifyException(ie));
           log.warn(errorMsg);
           log.warn(errorMsg);
-          tracker.mapOutputLost(mapId, errorMsg);
+          if (isInputException) {
+            tracker.mapOutputLost(mapId, errorMsg);
+          }
           response.sendError(HttpServletResponse.SC_GONE, errorMsg);
           response.sendError(HttpServletResponse.SC_GONE, errorMsg);
           throw ie;
           throw ie;
         } 
         } 
+        outStream.close();
       }
       }
     }
     }
 }
 }