Browse Source

MAPREDUCE-3252. Fix map tasks to not rewrite data an extra time when map output fits in spill buffer. Contributed by Todd Lipcon.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1188424 13f79535-47bb-0310-9956-ffa450edef68
Todd Lipcon 13 years ago
parent
commit
43553df4e3

+ 3 - 0
hadoop-mapreduce-project/CHANGES.txt

@@ -1750,6 +1750,9 @@ Release 0.23.0 - Unreleased
     MAPREDUCE-3249. Ensure shuffle-port is correctly used duringMR AM recovery. 
     (vinodkv via acmurthy) 
 
+    MAPREDUCE-3252. Fix map tasks to not rewrite data an extra time when
+    map output fits in spill buffer. (todd)
+
 Release 0.22.0 - Unreleased
 
   INCOMPATIBLE CHANGES

+ 28 - 3
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/MapTask.java

@@ -21,6 +21,7 @@ package org.apache.hadoop.mapred;
 import java.io.DataInput;
 import java.io.DataOutput;
 import java.io.DataOutputStream;
+import java.io.File;
 import java.io.IOException;
 import java.io.OutputStream;
 import java.nio.ByteBuffer;
@@ -36,8 +37,10 @@ import org.apache.hadoop.fs.FSDataInputStream;
 import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.FileSystem.Statistics;
+import org.apache.hadoop.fs.FileUtil;
 import org.apache.hadoop.fs.LocalFileSystem;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.RawLocalFileSystem;
 import org.apache.hadoop.io.DataInputBuffer;
 import org.apache.hadoop.io.RawComparator;
 import org.apache.hadoop.io.SequenceFile;
@@ -1727,10 +1730,10 @@ class MapTask extends Task {
         finalOutFileSize += rfs.getFileStatus(filename[i]).getLen();
       }
       if (numSpills == 1) { //the spill is the final output
-        rfs.rename(filename[0],
+        sameVolRename(filename[0],
             mapOutputFile.getOutputFileForWriteInVolume(filename[0]));
         if (indexCacheList.size() == 0) {
-          rfs.rename(mapOutputFile.getSpillIndexFile(0),
+          sameVolRename(mapOutputFile.getSpillIndexFile(0),
             mapOutputFile.getOutputIndexFileForWriteInVolume(filename[0]));
         } else {
           indexCacheList.get(0).writeToFile(
@@ -1847,7 +1850,29 @@ class MapTask extends Task {
         }
       }
     }
-
+    
+    /**
+     * Rename srcPath to dstPath on the same volume. This is the same
+     * as RawLocalFileSystem's rename method, except that it will not
+     * fall back to a copy, and it will create the target directory
+     * if it doesn't exist.
+     */
+    private void sameVolRename(Path srcPath,
+        Path dstPath) throws IOException {
+      RawLocalFileSystem rfs = (RawLocalFileSystem)this.rfs;
+      File src = rfs.pathToFile(srcPath);
+      File dst = rfs.pathToFile(dstPath);
+      if (!dst.getParentFile().exists()) {
+        if (!dst.getParentFile().mkdirs()) {
+          throw new IOException("Unable to rename " + src + " to "
+              + dst + ": couldn't create parent directory"); 
+        }
+      }
+      
+      if (!src.renameTo(dst)) {
+        throw new IOException("Unable to rename " + src + " to " + dst);
+      }
+    }
   } // MapOutputBuffer
   
   /**