浏览代码

HADOOP-2070. Added a flush method to pipes' DownwardProtocol and call that before waiting for the application to finish to ensure all buffered data is flushed. Contributed by Owen O'Malley.

git-svn-id: https://svn.apache.org/repos/asf/lucene/hadoop/trunk@586003 13f79535-47bb-0310-9956-ffa450edef68
Arun Murthy 17 年之前
父节点
当前提交
be51d05254

+ 4 - 0
CHANGES.txt

@@ -325,6 +325,10 @@ Branch 0.15 (unreleased changes)
     very uneven splits for applications like distcp that count on them.
     (omalley)
 
+    HADOOP-2070.  Added a flush method to pipes' DownwardProtocol and call
+    that before waiting for the application to finish to ensure all buffered
+    data is flushed. (Owen O'Malley via acmurthy)
+
   IMPROVEMENTS
 
     HADOOP-1908. Restructure data node code so that block sending and 

+ 4 - 2
src/java/org/apache/hadoop/mapred/pipes/Application.java

@@ -51,7 +51,7 @@ class Application<K1 extends WritableComparable, V1 extends Writable,
   private Process process;
   private Socket clientSocket;
   private OutputHandler<K2, V2> handler;
-  private BinaryProtocol<K1, V1, K2, V2> downlink;
+  private DownwardProtocol<K1, V1> downlink;
 
   /**
    * Start the child process to handle the task for us.
@@ -109,6 +109,7 @@ class Application<K1 extends WritableComparable, V1 extends Writable,
    * @throws Throwable
    */
   boolean waitForFinish() throws Throwable {
+    downlink.flush();
     return handler.waitForFinish();
   }
 
@@ -121,6 +122,7 @@ class Application<K1 extends WritableComparable, V1 extends Writable,
     LOG.info("Aborting because of " + StringUtils.stringifyException(t));
     try {
       downlink.abort();
+      downlink.flush();
     } catch (IOException e) {
       // IGNORE cleanup problems
     }
@@ -141,7 +143,7 @@ class Application<K1 extends WritableComparable, V1 extends Writable,
   void cleanup() throws IOException {
     serverSocket.close();
     try {
-      downlink.closeConnection();
+      downlink.close();
     } catch (InterruptedException ie) {
       Thread.currentThread().interrupt();
     }      

+ 6 - 3
src/java/org/apache/hadoop/mapred/pipes/BinaryProtocol.java

@@ -226,7 +226,7 @@ class BinaryProtocol<K1 extends WritableComparable, V1 extends Writable,
    * @throws IOException
    * @throws InterruptedException
    */
-  public void closeConnection() throws IOException, InterruptedException {
+  public void close() throws IOException, InterruptedException {
     LOG.debug("closing connection");
     stream.close();
     uplink.closeConnection();
@@ -291,10 +291,9 @@ class BinaryProtocol<K1 extends WritableComparable, V1 extends Writable,
     writeObject(value);
   }
 
-  public void close() throws IOException {
+  public void endOfInput() throws IOException {
     WritableUtils.writeVInt(stream, MessageType.CLOSE.code);
     LOG.debug("Sent close command");
-    stream.flush();
   }
   
   public void abort() throws IOException {
@@ -302,6 +301,10 @@ class BinaryProtocol<K1 extends WritableComparable, V1 extends Writable,
     LOG.debug("Sent abort command");
   }
 
+  public void flush() throws IOException {
+    stream.flush();
+  }
+
   /**
    * Write the given object to the stream. If it is a Text or BytesWritable,
    * write it directly. Otherwise, write it to a buffer and then write the

+ 11 - 1
src/java/org/apache/hadoop/mapred/pipes/DownwardProtocol.java

@@ -97,11 +97,21 @@ interface DownwardProtocol<K extends WritableComparable, V extends Writable> {
    * input.
    * @throws IOException
    */
-  void close() throws IOException;
+  void endOfInput() throws IOException;
   
   /**
    * The task should stop as soon as possible, because something has gone wrong.
    * @throws IOException
    */
   void abort() throws IOException;
+  
+  /**
+   * Flush the data through any buffers.
+   */
+  void flush() throws IOException;
+  
+  /**
+   * Close the connection.
+   */
+  void close() throws IOException, InterruptedException;
 }

+ 1 - 1
src/java/org/apache/hadoop/mapred/pipes/PipesMapRunner.java

@@ -76,7 +76,7 @@ class PipesMapRunner<K1 extends WritableComparable, V1 extends Writable,
           // map pair to output
           downlink.mapItem(key, value);
         }
-        downlink.close();
+        downlink.endOfInput();
       }
       application.waitForFinish();
     } catch (Throwable t) {

+ 2 - 1
src/java/org/apache/hadoop/mapred/pipes/PipesReducer.java

@@ -94,8 +94,9 @@ class PipesReducer<K2 extends WritableComparable, V2 extends Writable,
     }
     try {
       if (isOk) {
-        application.getDownlink().close();
+        application.getDownlink().endOfInput();
       } else {
+        // send the abort to the application and let it clean up
         application.getDownlink().abort();
       }
       LOG.info("waiting for finish");