Explorar o código

HADOOP-677. In IPC, permit a version header to be transmitted when connections are established. Contributed by Owen.

git-svn-id: https://svn.apache.org/repos/asf/lucene/hadoop/trunk@477433 13f79535-47bb-0310-9956-ffa450edef68
Doug Cutting %!s(int64=18) %!d(string=hai) anos
pai
achega
530d0ced96

+ 5 - 0
CHANGES.txt

@@ -106,6 +106,11 @@ Trunk (unreleased changes)
 32. HADOOP-709.  Fix contrib/streaming to work with commands that
     contain control characters.  (Dhruba Borthakur via cutting)
 
+33. HADOOP-677.  In IPC, permit a version header to be transmitted
+    when connections are established.  This will permit us to change
+    the format of IPC requests back-compatibly in subsequent releases.
+    (omalley via cutting)
+
 
 Release 0.8.0 - 2006-11-03
 

+ 13 - 3
src/java/org/apache/hadoop/ipc/Client.java

@@ -31,6 +31,7 @@ import java.io.BufferedInputStream;
 import java.io.BufferedOutputStream;
 import java.io.FilterInputStream;
 import java.io.FilterOutputStream;
+import java.io.OutputStream;
 
 import java.util.Hashtable;
 import java.util.Iterator;
@@ -43,6 +44,7 @@ import org.apache.hadoop.dfs.FSConstants;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.WritableUtils;
 import org.apache.hadoop.io.DataOutputBuffer;
+import org.apache.hadoop.util.StringUtils;
 
 /** A client for an IPC service.  IPC calls take a single {@link Writable} as a
  * parameter, and return a {@link Writable} as their value.  A service runs on
@@ -52,6 +54,10 @@ import org.apache.hadoop.io.DataOutputBuffer;
  * @see Server
  */
 public class Client {
+  /** Should the client send the header on the connection? */
+  private static final boolean SEND_HEADER = false;
+  private static final byte CURRENT_VERSION = 0;
+  
   public static final Log LOG =
     LogFactory.getLog("org.apache.hadoop.ipc.Client");
   private Hashtable connections = new Hashtable();
@@ -155,7 +161,6 @@ public class Client {
           }
         }
       }
-
       socket.setSoTimeout(timeout);
       this.in = new DataInputStream
         (new BufferedInputStream
@@ -178,6 +183,10 @@ public class Client {
                }
              }
            }));
+      if (SEND_HEADER) {
+        out.write(Server.HEADER.array());
+        out.write(CURRENT_VERSION);
+      }
       notify();
     }
 
@@ -269,7 +278,7 @@ public class Client {
       } catch (EOFException eof) {
           // This is what happens when the remote side goes down
       } catch (Exception e) {
-        LOG.info(getName() + " caught: " + e, e);
+        LOG.info(StringUtils.stringifyException(e));
       } finally {
         //If there was no exception thrown in this method, then the only
         //way we reached here is by breaking out of the while loop (after
@@ -480,7 +489,8 @@ public class Client {
           Connection connection = getConnection(addresses[i]);
           connection.sendParam(call);             // send each parameter
         } catch (IOException e) {
-          LOG.info("Calling "+addresses[i]+" caught: " + e); // log errors
+          LOG.info("Calling "+addresses[i]+" caught: " + 
+                   StringUtils.stringifyException(e)); // log errors
           results.size--;                         //  wait for one fewer result
         }
       }

+ 24 - 0
src/java/org/apache/hadoop/ipc/Server.java

@@ -59,6 +59,12 @@ import org.apache.hadoop.util.*;
  * @see Client
  */
 public abstract class Server {
+  
+  /**
+   * The first four bytes of Hadoop RPC connections
+   */
+  public static final ByteBuffer HEADER = ByteBuffer.wrap("hrpc".getBytes());
+  
   /**
    * How much time should be allocated for actually running the handler?
    * Calls that are older than ipc.timeout * MAX_CALL_QUEUE_TIME
@@ -346,6 +352,7 @@ public abstract class Server {
 
   /** Reads calls from a connection and queues them for handling. */
   private class Connection {
+    private boolean firstData = true;
     private SocketChannel channel;
     private SelectionKey key;
     private ByteBuffer data;
@@ -415,6 +422,23 @@ public abstract class Server {
         if ( count < 0 || dataLengthBuffer.remaining() > 0 ) 
           return count;        
         dataLengthBuffer.flip(); 
+        // Is this a new style header?
+        if (firstData && HEADER.equals(dataLengthBuffer)) {
+          // If so, read the version
+          ByteBuffer versionBuffer = ByteBuffer.allocate(1);
+          count = channel.read(versionBuffer);
+          if (count < 0) {
+            return count;
+          }
+          // read the first length
+          dataLengthBuffer.clear();
+          count = channel.read(dataLengthBuffer);
+          if (count < 0 || dataLengthBuffer.remaining() > 0) {
+            return count;
+          }
+          dataLengthBuffer.flip();
+          firstData = false;
+        }
         dataLength = dataLengthBuffer.getInt();
         data = ByteBuffer.allocate(dataLength);
       }