浏览代码

HADOOP-984. Fix a bug in shuffle error handling introduced by HADOOP-331. Contributed by Arun.

git-svn-id: https://svn.apache.org/repos/asf/lucene/hadoop/trunk@504640 13f79535-47bb-0310-9956-ffa450edef68
Doug Cutting 18 年之前
父节点
当前提交
866d0bf608
共有 2 个文件被更改,包括 44 次插入18 次删除
  1. 4 0
      CHANGES.txt
  2. 40 18
      src/java/org/apache/hadoop/mapred/TaskTracker.java

+ 4 - 0
CHANGES.txt

@@ -8,6 +8,10 @@ Trunk (unreleased changes)
  2. HADOOP-917.  Fix a NullPointerException in SequenceFile's merger
  2. HADOOP-917.  Fix a NullPointerException in SequenceFile's merger
     with large map outputs.  (omalley via cutting)
     with large map outputs.  (omalley via cutting)
 
 
+ 3. HADOOP-984.  Fix a bug in shuffle error handling introduced by
+    HADOOP-331.  If a map output is unavailable, the job tracker is
+    once more informed.  (Arun C Murthy via cutting)
+
 
 
 Release 0.11.0 - 2007-02-02
 Release 0.11.0 - 2007-02-02
 
 

+ 40 - 18
src/java/org/apache/hadoop/mapred/TaskTracker.java

@@ -1526,30 +1526,47 @@ public class TaskTracker
         JobConf conf = (JobConf) context.getAttribute("conf");
         JobConf conf = (JobConf) context.getAttribute("conf");
         FileSystem fileSys = 
         FileSystem fileSys = 
           (FileSystem) context.getAttribute("local.file.system");
           (FileSystem) context.getAttribute("local.file.system");
-        //open index file
+
+        // Index file
         Path indexFileName = conf.getLocalPath(mapId+"/file.out.index");
         Path indexFileName = conf.getLocalPath(mapId+"/file.out.index");
-        FSDataInputStream in = fileSys.open(indexFileName);
-        //seek to the correct offset for the given reduce
-        in.seek(reduce * 16);
-        
-        //read the offset and length of the partition data
-        long startOffset = in.readLong();
-        long partLength = in.readLong();
-        
-        in.close();
+        FSDataInputStream indexIn = null;
          
          
+        // Map-output file
         Path mapOutputFileName = conf.getLocalPath(mapId+"/file.out"); 
         Path mapOutputFileName = conf.getLocalPath(mapId+"/file.out"); 
-           
-        response.setContentLength((int) partLength);
-        FSDataInputStream inStream = null;
+        FSDataInputStream mapOutputIn = null;
+        
         // true iff IOException was caused by attempt to access input
         // true iff IOException was caused by attempt to access input
         boolean isInputException = true;
         boolean isInputException = true;
+        
         try {
         try {
-          inStream = fileSys.open(mapOutputFileName);
-          inStream.seek(startOffset);
+          /**
+           * Read the index file to get the information about where
+           * the map-output for the given reducer is available. 
+           */
+          //open index file
+          indexIn = fileSys.open(indexFileName);
+
+          //seek to the correct offset for the given reduce
+          indexIn.seek(reduce * 16);
+          
+          //read the offset and length of the partition data
+          long startOffset = indexIn.readLong();
+          long partLength = indexIn.readLong();
+
+          //set the content-length header
+          response.setContentLength((int) partLength);
+
+          /**
+           * Read the data from the sigle map-output file and
+           * send it to the reducer.
+           */
+          //open the map-output file
+          mapOutputIn = fileSys.open(mapOutputFileName);
+          //seek to the correct offset for the reduce
+          mapOutputIn.seek(startOffset);
           try {
           try {
             int totalRead = 0;
             int totalRead = 0;
-            int len = inStream.read(buffer, 0,
+            int len = mapOutputIn.read(buffer, 0,
                                  partLength < MAX_BYTES_TO_READ 
                                  partLength < MAX_BYTES_TO_READ 
                                  ? (int)partLength : MAX_BYTES_TO_READ);
                                  ? (int)partLength : MAX_BYTES_TO_READ);
             while (len > 0) {
             while (len > 0) {
@@ -1561,12 +1578,17 @@ public class TaskTracker
               }
               }
               totalRead += len;
               totalRead += len;
               if (totalRead == partLength) break;
               if (totalRead == partLength) break;
-              len = inStream.read(buffer, 0, 
+              len = mapOutputIn.read(buffer, 0, 
                       (partLength - totalRead) < MAX_BYTES_TO_READ
                       (partLength - totalRead) < MAX_BYTES_TO_READ
                        ? (int)(partLength - totalRead) : MAX_BYTES_TO_READ);
                        ? (int)(partLength - totalRead) : MAX_BYTES_TO_READ);
             }
             }
           } finally {
           } finally {
-            inStream.close();
+            if (indexIn != null) {
+              indexIn.close();
+            }
+            if (mapOutputIn != null) {
+              mapOutputIn.close();
+            }
           }
           }
         } catch (IOException ie) {
         } catch (IOException ie) {
           TaskTracker tracker = 
           TaskTracker tracker =