瀏覽代碼

HADOOP-1788. Increase the buffer size on Pipes' command socket from 1k to
128k. Contributed by Amareshwari Sri Ramadasu and Christian Kunz.


git-svn-id: https://svn.apache.org/repos/asf/lucene/hadoop/trunk@585220 13f79535-47bb-0310-9956-ffa450edef68

Owen O'Malley 18 年之前
父節點
當前提交
9a2502edf8
共有 3 個文件被更改,包括 31 次插入2 次删除
  1. 3 0
      CHANGES.txt
  2. 18 0
      src/c++/pipes/impl/HadoopPipes.cc
  3. 10 2
      src/java/org/apache/hadoop/mapred/pipes/BinaryProtocol.java

+ 3 - 0
CHANGES.txt

@@ -118,6 +118,9 @@ Branch 0.15 (unreleased changes)
     HADOOP-1774. Remove use of INode.parent in Block CRC upgrade.
     (Raghu Angadi via dhruba)
 
+    HADOOP-1788.  Increase the buffer size on the Pipes command socket.
+    (Amareshwari Sri Ramadasu and Christian Kunz via omalley)
+
   BUG FIXES
 
     HADOOP-1946.  The Datanode code does not need to invoke du on

+ 18 - 0
src/c++/pipes/impl/HadoopPipes.cc

@@ -855,6 +855,8 @@ namespace HadoopPipes {
       int sock = -1;
       FILE* stream = NULL;
       FILE* outStream = NULL;
+      char *bufin = NULL;
+      char *bufout = NULL;
       if (portStr) {
         sock = socket(PF_INET, SOCK_STREAM, 0);
         HADOOP_ASSERT(sock != - 1,
@@ -866,8 +868,22 @@ namespace HadoopPipes {
         HADOOP_ASSERT(connect(sock, (sockaddr*) &addr, sizeof(addr)) == 0,
                       string("problem connecting command socket: ") +
                       strerror(errno));
+
         stream = fdopen(sock, "r");
         outStream = fdopen(sock, "w");
+
+        // increase buffer size
+        int bufsize = 128*1024;
+        int setbuf;
+        bufin = new char[bufsize];
+        bufout = new char[bufsize];
+        setbuf = setvbuf(stream, bufin, _IOFBF, bufsize);
+        HADOOP_ASSERT(setbuf == 0, string("problem with setvbuf for inStream: ")
+                                     + strerror(errno));
+        setbuf = setvbuf(outStream, bufout, _IOFBF, bufsize);
+        HADOOP_ASSERT(setbuf == 0, string("problem with setvbuf for outStream: ")
+                                     + strerror(errno));
+
         connection = new BinaryProtocol(stream, context, outStream);
       } else if (getenv("hadoop.pipes.command.file")) {
         char* filename = getenv("hadoop.pipes.command.file");
@@ -907,6 +923,8 @@ namespace HadoopPipes {
       if (outStream != NULL) {
         //fclose(outStream);
       } 
+      delete bufin;
+      delete bufout;
       return true;
     } catch (Error& err) {
       fprintf(stderr, "Hadoop Pipes Exception: %s\n", 

+ 10 - 2
src/java/org/apache/hadoop/mapred/pipes/BinaryProtocol.java

@@ -44,6 +44,11 @@ class BinaryProtocol<K1 extends WritableComparable, V1 extends Writable,
   implements DownwardProtocol<K1, V1> {
   
   public static final int CURRENT_PROTOCOL_VERSION = 0;
+  /**
+   * The buffer size for the command socket
+   */
+  private static final int BUFFER_SIZE = 128*1024;
+
   private DataOutputStream stream;
   private DataOutputBuffer buffer = new DataOutputBuffer();
   private static final Log LOG = 
@@ -87,7 +92,8 @@ class BinaryProtocol<K1 extends WritableComparable, V1 extends Writable,
     public UplinkReaderThread(InputStream stream,
                               UpwardProtocol<K2, V2> handler, 
                               K2 key, V2 value) throws IOException{
-      inStream = new DataInputStream(stream);
+      inStream = new DataInputStream(new BufferedInputStream(stream, 
+                                                             BUFFER_SIZE));
       this.handler = handler;
       this.key = key;
       this.value = value;
@@ -207,7 +213,8 @@ class BinaryProtocol<K1 extends WritableComparable, V1 extends Writable,
     if (Submitter.getKeepCommandFile(config)) {
       raw = new TeeOutputStream("downlink.data", raw);
     }
-    stream = new DataOutputStream(raw);
+    stream = new DataOutputStream(new BufferedOutputStream(raw, 
+                                                           BUFFER_SIZE)) ;
     uplink = new UplinkReaderThread<K2, V2>(sock.getInputStream(),
                                             handler, key, value);
     uplink.setName("pipe-uplink-handler");
@@ -287,6 +294,7 @@ class BinaryProtocol<K1 extends WritableComparable, V1 extends Writable,
   public void close() throws IOException {
     WritableUtils.writeVInt(stream, MessageType.CLOSE.code);
     LOG.debug("Sent close command");
+    stream.flush();
   }
   
   public void abort() throws IOException {