浏览代码

HADOOP-3204. Fixes a problem to do with ReduceTask's LocalFSMerger not catching Throwable. Contributed by Amar Ramesh Kamat.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/core/trunk@647144 13f79535-47bb-0310-9956-ffa450edef68
Devaraj Das 17 年之前
父节点
当前提交
477749e6c5
共有 2 个文件被更改,包括 18 次插入5 次删除
  1. 3 0
      CHANGES.txt
  2. 15 5
      src/java/org/apache/hadoop/mapred/ReduceTask.java

+ 3 - 0
CHANGES.txt

@@ -568,6 +568,9 @@ Release 0.17.0 - Unreleased
    HADOOP-1373. checkPath() should ignore case when it compares authoriy.
    (Edward J. Yoon via rangadi)
 
+   HADOOP-3204. Fixes a problem to do with ReduceTask's LocalFSMerger not
+   catching Throwable.  (Amar Ramesh Kamat via ddas)
+
 Release 0.16.3 - Unreleased
 
   BUG FIXES

+ 15 - 5
src/java/org/apache/hadoop/mapred/ReduceTask.java

@@ -1380,6 +1380,10 @@ class ReduceTask extends Task {
             LOG.warn(reduceTask.getTaskId() +
                      " Final merge of the inmemory files threw an exception: " + 
                      StringUtils.stringifyException(t));
+            // check if the last merge generated an error
+            if (mergeThrowable != null) {
+              mergeThrowable = t;
+            }
             return false;
           }
         }
@@ -1548,9 +1552,15 @@ class ReduceTask extends Task {
             .suffix(".merged");
           SequenceFile.Writer writer =
             sorter.cloneFileAttributes(mapFiles[0], outputPath, null);
-          SequenceFile.Sorter.RawKeyValueIterator iter;
+          SequenceFile.Sorter.RawKeyValueIterator iter  = null;
           Path tmpDir = new Path(reduceTask.getTaskId());
-          iter = sorter.merge(mapFiles, true, ioSortFactor, tmpDir);
+          try {
+            iter = sorter.merge(mapFiles, true, ioSortFactor, tmpDir);
+          } catch (Exception e) {
+            writer.close();
+            localFileSys.delete(outputPath, true);
+            throw new IOException (StringUtils.stringifyException(e));
+          }
           sorter.writeFile(iter, writer);
           writer.close();
           
@@ -1560,12 +1570,12 @@ class ReduceTask extends Task {
           
           LOG.info(reduceTask.getTaskId()
                    + " Finished merging map output files on disk.");
-        } catch (IOException ioe) {
+        } catch (Throwable t) {
           LOG.warn(reduceTask.getTaskId()
                    + " Merging of the local FS files threw an exception: "
-                   + StringUtils.stringifyException(ioe));
+                   + StringUtils.stringifyException(t));
           if (mergeThrowable == null) {
-            mergeThrowable = ioe;
+            mergeThrowable = t;
           }
         } finally {
           localFSMergeInProgress = false;