ソースを参照

HADOOP-3429. Increases the size of the buffers used for the communication for Streaming jobs. Contributed by Amareshwari Sriramadasu.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/core/trunk@662805 13f79535-47bb-0310-9956-ffa450edef68
Devaraj Das 17 年 前
コミット
1225b7e73f

+ 3 - 0
CHANGES.txt

@@ -189,6 +189,9 @@ Trunk (unreleased changes)
     HADOOP-3434. Retain the cause of the bind failure in Server::bind.
     (Steve Loughran via cdouglas)
 
+    HADOOP-3429. Increases the size of the buffers used for the communication
+    for Streaming jobs. (Amareshwari Sriramadasu via ddas)
+
   OPTIMIZATIONS
 
     HADOOP-3274. The default constructor of BytesWritable creates empty 

+ 9 - 5
src/contrib/streaming/src/java/org/apache/hadoop/streaming/PipeMapRed.java

@@ -22,7 +22,6 @@ import java.io.*;
 import java.nio.charset.CharacterCodingException;
 import java.io.IOException;
 import java.util.Date;
-import java.util.List;
 import java.util.Map;
 import java.util.Iterator;
 import java.util.Arrays;
@@ -35,9 +34,7 @@ import org.apache.hadoop.fs.FileUtil;
 import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.mapred.Reporter;
 import org.apache.hadoop.mapred.OutputCollector;
-import org.apache.hadoop.mapred.TaskLog;
 import org.apache.hadoop.mapred.LineRecordReader.LineReader;
-import org.apache.hadoop.util.Shell;
 import org.apache.hadoop.util.StringUtils;
 
 import org.apache.hadoop.io.Text;
@@ -65,6 +62,8 @@ public abstract class PipeMapRed {
   final static int OUTSIDE = 1;
   final static int SINGLEQ = 2;
   final static int DOUBLEQ = 3;
+  
+  private final static int BUFFER_SIZE = 128 * 1024;
 
   static String[] splitArgs(String args) {
     ArrayList argList = new ArrayList();
@@ -172,8 +171,12 @@ public abstract class PipeMapRed {
       builder.environment().putAll(childEnv.toMap());
       sim = builder.start();
 
-      clientOut_ = new DataOutputStream(new BufferedOutputStream(sim.getOutputStream()));
-      clientIn_ = new DataInputStream(new BufferedInputStream(sim.getInputStream()));
+      clientOut_ = new DataOutputStream(new BufferedOutputStream(
+                                              sim.getOutputStream(),
+                                              BUFFER_SIZE));
+      clientIn_ = new DataInputStream(new BufferedInputStream(
+                                              sim.getInputStream(),
+                                              BUFFER_SIZE));
       clientErr_ = new DataInputStream(new BufferedInputStream(sim.getErrorStream()));
       startTime_ = System.currentTimeMillis();
 
@@ -457,6 +460,7 @@ public abstract class PipeMapRed {
       if (!doPipe_) return;
       try {
         if (clientOut_ != null) {
+          clientOut_.flush();
           clientOut_.close();
         }
       } catch (IOException io) {

+ 0 - 1
src/contrib/streaming/src/java/org/apache/hadoop/streaming/PipeMapper.java

@@ -89,7 +89,6 @@ public class PipeMapper extends PipeMapRed implements Mapper {
         }
         write(value);
         clientOut_.write('\n');
-        clientOut_.flush();
       } else {
         numRecSkipped_++;
       }

+ 0 - 1
src/contrib/streaming/src/java/org/apache/hadoop/streaming/PipeReducer.java

@@ -78,7 +78,6 @@ public class PipeReducer extends PipeMapRed implements Reducer {
           clientOut_.write('\t');
           write(val);
           clientOut_.write('\n');
-          clientOut_.flush();
         } else {
           // "identity reduce"
           output.collect(key, val);