Browse Source

HADOOP-7776 Make the Ipc-Header in a RPC-Payload an explicit header (sanjay)

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1197885 13f79535-47bb-0310-9956-ffa450edef68
Sanjay Radia 13 năm trước cách đây
mục cha
commit
072bdd85d1

+ 2 - 0
hadoop-common-project/hadoop-common/CHANGES.txt

@@ -55,6 +55,8 @@ Trunk (unreleased changes)
     HADOOP-7792. Add verifyToken method to AbstractDelegationTokenSecretManager.
     (jitendra)
 
+		HADOOP-7776 Make the Ipc-Header in a RPC-Payload an explicit header (sanjay)
+
   BUGS
 
     HADOOP-7606. Upgrade Jackson to version 1.7.1 to match the version required

+ 73 - 33
hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Client.java

@@ -48,6 +48,7 @@ import org.apache.commons.logging.*;
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceStability;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.ipc.RpcPayloadHeader.*;
 import org.apache.hadoop.io.IOUtils;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.io.Writable;
@@ -152,16 +153,20 @@ public class Client {
     return refCount==0;
   }
 
-  /** A call waiting for a value. */
+  /** 
+   * Class that represents an RPC call
+   */
   private class Call {
-    int id;                                       // call id
-    Writable param;                               // parameter
-    Writable value;                               // value, null if error
-    IOException error;                            // exception, null if value
-    boolean done;                                 // true when call is done
-
-    protected Call(Writable param) {
-      this.param = param;
+    final int id;               // call id
+    final Writable rpcRequest;  // the serialized rpc request - RpcPayload
+    Writable rpcResponse;       // null if rpc has error
+    IOException error;          // exception, null if success
+    final RpcKind rpcKind;      // Rpc EngineKind
+    boolean done;               // true when call is done
+
+    protected Call(RpcKind rpcKind, Writable param) {
+      this.rpcKind = rpcKind;
+      this.rpcRequest = param;
       synchronized (Client.this) {
         this.id = counter++;
       }
@@ -187,15 +192,15 @@ public class Client {
     /** Set the return value when there is no error. 
      * Notify the caller the call is done.
      * 
-     * @param value return value of the call.
+     * @param rpcResponse return value of the rpc call.
      */
-    public synchronized void setValue(Writable value) {
-      this.value = value;
+    public synchronized void setRpcResponse(Writable rpcResponse) {
+      this.rpcResponse = rpcResponse;
       callComplete();
     }
     
-    public synchronized Writable getValue() {
-      return value;
+    public synchronized Writable getRpcResult() {
+      return rpcResponse;
     }
   }
 
@@ -727,6 +732,7 @@ public class Client {
       }
     }
 
+    @SuppressWarnings("unused")
     public InetSocketAddress getRemoteAddress() {
       return server;
     }
@@ -787,8 +793,10 @@ public class Client {
           //for serializing the
           //data to be written
           d = new DataOutputBuffer();
-          d.writeInt(call.id);
-          call.param.write(d);
+          RpcPayloadHeader header = new RpcPayloadHeader(
+              call.rpcKind, RpcPayloadOperation.RPC_FINAL_PAYLOAD, call.id);
+          header.write(d);
+          call.rpcRequest.write(d);
           byte[] data = d.getData();
           int dataLength = d.getLength();
           out.writeInt(dataLength);      //first put the data length
@@ -825,7 +833,7 @@ public class Client {
         if (state == Status.SUCCESS.state) {
           Writable value = ReflectionUtils.newInstance(valueClass, conf);
           value.readFields(in);                 // read value
-          call.setValue(value);
+          call.setRpcResponse(value);
           calls.remove(id);
         } else if (state == Status.ERROR.state) {
           call.setException(new RemoteException(WritableUtils.readString(in),
@@ -909,7 +917,7 @@ public class Client {
     private int index;
     
     public ParallelCall(Writable param, ParallelResults results, int index) {
-      super(param);
+      super(RpcKind.RPC_WRITABLE, param);
       this.results = results;
       this.index = index;
     }
@@ -933,7 +941,7 @@ public class Client {
 
     /** Collect a result. */
     public synchronized void callComplete(ParallelCall call) {
-      values[call.index] = call.getValue();       // store the value
+      values[call.index] = call.getRpcResult();       // store the value
       count++;                                    // count it
       if (count == size)                          // if all values are in
         notify();                                 // then notify waiting caller
@@ -993,15 +1001,23 @@ public class Client {
     }
   }
 
+  /**
+   * Same as {@link #call(RpcKind, Writable, ConnectionId)} for Writable
+   */
+  public Writable call(Writable param, InetSocketAddress address)
+  throws InterruptedException, IOException {
+    return call(RpcKind.RPC_WRITABLE, param, address);
+    
+  }
   /** Make a call, passing <code>param</code>, to the IPC server running at
    * <code>address</code>, returning the value.  Throws exceptions if there are
    * network problems or if the remote code threw an exception.
-   * @deprecated Use {@link #call(Writable, ConnectionId)} instead 
+   * @deprecated Use {@link #call(RpcKind, Writable, ConnectionId)} instead 
    */
   @Deprecated
-  public Writable call(Writable param, InetSocketAddress address)
+  public Writable call(RpcKind rpcKind, Writable param, InetSocketAddress address)
   throws InterruptedException, IOException {
-      return call(param, address, null);
+      return call(rpcKind, param, address, null);
   }
   
   /** Make a call, passing <code>param</code>, to the IPC server running at
@@ -1009,15 +1025,15 @@ public class Client {
    * the value.  
    * Throws exceptions if there are network problems or if the remote code 
    * threw an exception.
-   * @deprecated Use {@link #call(Writable, ConnectionId)} instead 
+   * @deprecated Use {@link #call(RpcKind, Writable, ConnectionId)} instead 
    */
   @Deprecated
-  public Writable call(Writable param, InetSocketAddress addr, 
+  public Writable call(RpcKind rpcKind, Writable param, InetSocketAddress addr, 
       UserGroupInformation ticket)  
       throws InterruptedException, IOException {
     ConnectionId remoteId = ConnectionId.getConnectionId(addr, null, ticket, 0,
         conf);
-    return call(param, remoteId);
+    return call(rpcKind, param, remoteId);
   }
   
   /** Make a call, passing <code>param</code>, to the IPC server running at
@@ -1026,18 +1042,33 @@ public class Client {
    * timeout, returning the value.  
    * Throws exceptions if there are network problems or if the remote code 
    * threw an exception. 
-   * @deprecated Use {@link #call(Writable, ConnectionId)} instead 
+   * @deprecated Use {@link #call(RpcKind, Writable, ConnectionId)} instead 
    */
   @Deprecated
-  public Writable call(Writable param, InetSocketAddress addr, 
+  public Writable call(RpcKind rpcKind, Writable param, InetSocketAddress addr, 
                        Class<?> protocol, UserGroupInformation ticket,
                        int rpcTimeout)  
                        throws InterruptedException, IOException {
     ConnectionId remoteId = ConnectionId.getConnectionId(addr, protocol,
         ticket, rpcTimeout, conf);
-    return call(param, remoteId);
+    return call(rpcKind, param, remoteId);
   }
 
+  
+  /**
+   * Same as {@link #call(RpcKind, Writable, InetSocketAddress, 
+   * Class, UserGroupInformation, int, Configuration)}
+   * except that rpcKind is writable.
+   */
+  public Writable call(Writable param, InetSocketAddress addr, 
+      Class<?> protocol, UserGroupInformation ticket,
+      int rpcTimeout, Configuration conf)  
+      throws InterruptedException, IOException {
+        ConnectionId remoteId = ConnectionId.getConnectionId(addr, protocol,
+        ticket, rpcTimeout, conf);
+        return call(RpcKind.RPC_WRITABLE, param, remoteId);
+  }
+  
   /**
    * Make a call, passing <code>param</code>, to the IPC server running at
    * <code>address</code> which is servicing the <code>protocol</code> protocol,
@@ -1046,22 +1077,31 @@ public class Client {
    * value. Throws exceptions if there are network problems or if the remote
    * code threw an exception.
    */
-  public Writable call(Writable param, InetSocketAddress addr, 
+  public Writable call(RpcKind rpcKind, Writable param, InetSocketAddress addr, 
                        Class<?> protocol, UserGroupInformation ticket,
                        int rpcTimeout, Configuration conf)  
                        throws InterruptedException, IOException {
     ConnectionId remoteId = ConnectionId.getConnectionId(addr, protocol,
         ticket, rpcTimeout, conf);
-    return call(param, remoteId);
+    return call(rpcKind, param, remoteId);
+  }
+  
+  /**
+   * Same as {link {@link #call(RpcKind, Writable, ConnectionId)}
+   * except the rpcKind is RPC_WRITABLE
+   */
+  public Writable call(Writable param, ConnectionId remoteId)  
+      throws InterruptedException, IOException {
+     return call(RpcKind.RPC_WRITABLE, param, remoteId);
   }
   
   /** Make a call, passing <code>param</code>, to the IPC server defined by
    * <code>remoteId</code>, returning the value.  
    * Throws exceptions if there are network problems or if the remote code 
    * threw an exception. */
-  public Writable call(Writable param, ConnectionId remoteId)  
+  public Writable call(RpcKind rpcKind, Writable param, ConnectionId remoteId)  
       throws InterruptedException, IOException {
-    Call call = new Call(param);
+    Call call = new Call(rpcKind, param);
     Connection connection = getConnection(remoteId, call);
     connection.sendParam(call);                 // send the parameter
     boolean interrupted = false;
@@ -1093,7 +1133,7 @@ public class Client {
                   call.error);
         }
       } else {
-        return call.value;
+        return call.rpcResponse;
       }
     }
   }

+ 2 - 1
hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/ProtobufRpcEngine.java

@@ -37,6 +37,7 @@ import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceStability;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.ipc.RpcPayloadHeader.RpcKind;
 import org.apache.hadoop.ipc.protobuf.HadoopRpcProtos.HadoopRpcExceptionProto;
 import org.apache.hadoop.ipc.protobuf.HadoopRpcProtos.HadoopRpcRequestProto;
 import org.apache.hadoop.ipc.protobuf.HadoopRpcProtos.HadoopRpcResponseProto;
@@ -139,7 +140,7 @@ public class ProtobufRpcEngine implements RpcEngine {
       HadoopRpcRequestProto rpcRequest = constructRpcRequest(method, args);
       RpcResponseWritable val = null;
       try {
-        val = (RpcResponseWritable) client.call(
+        val = (RpcResponseWritable) client.call(RpcKind.RPC_PROTOCOL_BUFFER,
             new RpcRequestWritable(rpcRequest), remoteId);
       } catch (Exception e) {
         RpcClientException ce = new RpcClientException("Client exception", e);

+ 118 - 0
hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/RpcPayloadHeader.java

@@ -0,0 +1,118 @@
+package org.apache.hadoop.ipc;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+import org.apache.hadoop.io.Writable;
+
+/**
+ * This is the rpc payload header. It is sent with every rpc call
+ * <pre>
+ * The format of RPC call is as follows:
+ * +---------------------------------------------------+
+ * |  Rpc length in bytes (header + payload length)    |
+ * +---------------------------------------------------+
+ * |      Rpc Header       |       Rpc Payload         |
+ * +---------------------------------------------------+
+ * 
+ * The format of Rpc Header is:
+ * +----------------------------------+
+ * |  RpcKind (1 bytes)               |      
+ * +----------------------------------+
+ * |  RpcPayloadOperation (1 bytes)   |      
+ * +----------------------------------+
+ * |  Call ID (4 bytes)               |      
+ * +----------------------------------+
+ * 
+ * {@link RpcKind} determines the type of serialization used for Rpc Payload.
+ * </pre>
+ * <p>
+ * <b>Note this header does NOT have its own version number, 
+ * it used the version number from the connection header. </b>
+ */
+public class RpcPayloadHeader implements Writable {
+  public enum RpcPayloadOperation {
+    RPC_FINAL_PAYLOAD ((short)1),
+    RPC_CONTINUATION_PAYLOAD ((short)2), // not implemented yet
+    RPC_CLOSE_CONNECTION ((short)3);     // close the rpc connection
+    
+    private final short code;
+    private static final short FIRST_INDEX = RPC_FINAL_PAYLOAD.code;
+    RpcPayloadOperation(short val) {
+      this.code = val;
+    }
+    
+    public void write(DataOutput out) throws IOException {  
+      out.writeByte(code);
+    }
+    
+    static RpcPayloadOperation readFields(DataInput in) throws IOException {
+      short inValue = in.readByte();
+      return RpcPayloadOperation.values()[inValue - FIRST_INDEX];
+    }
+  }
+  
+  public enum RpcKind {
+    RPC_BUILTIN ((short ) 1),  // Used for built in calls
+    RPC_WRITABLE ((short ) 2),
+    RPC_PROTOCOL_BUFFER ((short)3), 
+    RPC_AVRO ((short)4);
+    
+    private final short value;
+    private static final short FIRST_INDEX = RPC_BUILTIN.value;
+    RpcKind(short val) {
+      this.value = val;
+    }
+    
+    public void write(DataOutput out) throws IOException {
+      out.writeByte(value);
+    }
+    
+    static RpcKind readFields(DataInput in) throws IOException {
+      short inValue = in.readByte();
+      return RpcKind.values()[inValue - FIRST_INDEX];
+    }  
+  }
+  
+  private RpcKind kind;
+  private RpcPayloadOperation operation;
+  private int callId;
+  
+  public RpcPayloadHeader() {
+    kind = RpcKind.RPC_WRITABLE;
+    operation = RpcPayloadOperation.RPC_CLOSE_CONNECTION;
+  }
+  
+  public RpcPayloadHeader(RpcKind kind, RpcPayloadOperation op, int callId) {
+    this.kind  = kind;
+    this.operation = op;
+    this.callId = callId;
+  }
+  
+  int getCallId() {
+    return callId;
+  }
+  
+  RpcKind getkind() {
+    return kind;
+  }
+  
+  RpcPayloadOperation getOperation() {
+    return operation;
+  }
+
+  @Override
+  public void write(DataOutput out) throws IOException {
+    kind.write(out);
+    operation.write(out);
+    out.writeInt(callId); 
+  }
+
+  @Override
+  public void readFields(DataInput in) throws IOException {
+    kind = RpcKind.readFields(in);
+    operation = RpcPayloadOperation.readFields(in);
+    this.callId = in.readInt();
+  }
+}

+ 46 - 32
hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java

@@ -61,11 +61,13 @@ import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.CommonConfigurationKeys;
+import org.apache.hadoop.ipc.RpcPayloadHeader.*;
 import org.apache.hadoop.io.BytesWritable;
 import org.apache.hadoop.io.IntWritable;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.WritableUtils;
 import org.apache.hadoop.ipc.RPC.VersionMismatch;
+import org.apache.hadoop.ipc.RpcPayloadHeader.RpcPayloadOperation;
 import org.apache.hadoop.ipc.metrics.RpcDetailedMetrics;
 import org.apache.hadoop.ipc.metrics.RpcMetrics;
 import org.apache.hadoop.net.NetUtils;
@@ -124,7 +126,8 @@ public abstract class Server {
   // 4 : Introduced SASL security layer
   // 5 : Introduced use of {@link ArrayPrimitiveWritable$Internal}
   //     in ObjectWritable to efficiently transmit arrays of primitives
-  public static final byte CURRENT_VERSION = 5;
+  // 6 : Made RPC payload header explicit
+  public static final byte CURRENT_VERSION = 6;
 
   /**
    * Initial and max size of response buffer
@@ -277,28 +280,33 @@ public abstract class Server {
 
   /** A call queued for handling. */
   private static class Call {
-    private int id;                               // the client's call id
-    private Writable param;                       // the parameter passed
-    private Connection connection;                // connection to client
-    private long timestamp;     // the time received when response is null
-                                   // the time served when response is not null
-    private ByteBuffer response;                      // the response for this call
-
-    public Call(int id, Writable param, Connection connection) { 
-      this.id = id;
-      this.param = param;
+    private final int callId;             // the client's call id
+    private final Writable rpcRequest;    // Serialized Rpc request from client
+    private final Connection connection;  // connection to client
+    private long timestamp;               // time received when response is null
+                                          // time served when response is not null
+    private ByteBuffer rpcResponse;       // the response for this call
+    private final RpcKind rpcKind;
+
+    public Call(int id, Writable param, Connection connection) {
+      this( id,  param,  connection, RpcKind.RPC_BUILTIN );    
+    }
+    public Call(int id, Writable param, Connection connection, RpcKind kind) { 
+      this.callId = id;
+      this.rpcRequest = param;
       this.connection = connection;
       this.timestamp = System.currentTimeMillis();
-      this.response = null;
+      this.rpcResponse = null;
+      this.rpcKind = kind;
     }
     
     @Override
     public String toString() {
-      return param.toString() + " from " + connection.toString();
+      return rpcRequest.toString() + " from " + connection.toString();
     }
 
     public void setResponse(ByteBuffer response) {
-      this.response = response;
+      this.rpcResponse = response;
     }
   }
 
@@ -795,17 +803,17 @@ public abstract class Server {
           call = responseQueue.removeFirst();
           SocketChannel channel = call.connection.channel;
           if (LOG.isDebugEnabled()) {
-            LOG.debug(getName() + ": responding to #" + call.id + " from " +
+            LOG.debug(getName() + ": responding to #" + call.callId + " from " +
                       call.connection);
           }
           //
           // Send as much data as we can in the non-blocking fashion
           //
-          int numBytes = channelWrite(channel, call.response);
+          int numBytes = channelWrite(channel, call.rpcResponse);
           if (numBytes < 0) {
             return true;
           }
-          if (!call.response.hasRemaining()) {
+          if (!call.rpcResponse.hasRemaining()) {
             call.connection.decRpcCount();
             if (numElements == 1) {    // last call fully processes.
               done = true;             // no more data for this channel.
@@ -813,7 +821,7 @@ public abstract class Server {
               done = false;            // more calls pending to be sent.
             }
             if (LOG.isDebugEnabled()) {
-              LOG.debug(getName() + ": responding to #" + call.id + " from " +
+              LOG.debug(getName() + ": responding to #" + call.callId + " from " +
                         call.connection + " Wrote " + numBytes + " bytes.");
             }
           } else {
@@ -841,7 +849,7 @@ public abstract class Server {
               }
             }
             if (LOG.isDebugEnabled()) {
-              LOG.debug(getName() + ": responding to #" + call.id + " from " +
+              LOG.debug(getName() + ": responding to #" + call.callId + " from " +
                         call.connection + " Wrote partial " + numBytes + 
                         " bytes.");
             }
@@ -1408,18 +1416,24 @@ public abstract class Server {
     private void processData(byte[] buf) throws  IOException, InterruptedException {
       DataInputStream dis =
         new DataInputStream(new ByteArrayInputStream(buf));
-      int id = dis.readInt();                    // try to read an id
+      RpcPayloadHeader header = new RpcPayloadHeader();
+      header.readFields(dis);           // Read the RpcPayload header
         
       if (LOG.isDebugEnabled())
-        LOG.debug(" got #" + id);
-      Writable param;
-      try {
-        param = ReflectionUtils.newInstance(paramClass, conf);//read param
-        param.readFields(dis);
+        LOG.debug(" got #" + header.getCallId());
+      if (header.getOperation() != RpcPayloadOperation.RPC_FINAL_PAYLOAD) {
+        throw new IOException("IPC Server does not implement operation" + 
+              header.getOperation());
+      }
+      Writable rpcRequest;
+      try { //Read the rpc request
+        rpcRequest = ReflectionUtils.newInstance(paramClass, conf);
+        rpcRequest.readFields(dis);
       } catch (Throwable t) {
         LOG.warn("Unable to read call parameters for client " +
                  getHostAddress(), t);
-        final Call readParamsFailedCall = new Call(id, null, this);
+        final Call readParamsFailedCall = 
+            new Call(header.getCallId(), null, this);
         ByteArrayOutputStream responseBuffer = new ByteArrayOutputStream();
 
         setupResponse(responseBuffer, readParamsFailedCall, Status.FATAL, null,
@@ -1429,7 +1443,7 @@ public abstract class Server {
         return;
       }
         
-      Call call = new Call(id, param, this);
+      Call call = new Call(header.getCallId(), rpcRequest, this, header.getkind());
       callQueue.put(call);              // queue the call; maybe blocked here
       incRpcCount();  // Increment the rpc count
     }
@@ -1493,8 +1507,8 @@ public abstract class Server {
           final Call call = callQueue.take(); // pop the queue; maybe blocked here
 
           if (LOG.isDebugEnabled())
-            LOG.debug(getName() + ": has #" + call.id + " from " +
-                      call.connection);
+            LOG.debug(getName() + ": has Call#" + call.callId + 
+                "for RpcKind " + call.rpcKind + " from " + call.connection);
           
           String errorClass = null;
           String error = null;
@@ -1505,7 +1519,7 @@ public abstract class Server {
             // Make the call as the user via Subject.doAs, thus associating
             // the call with the Subject
             if (call.connection.user == null) {
-              value = call(call.connection.protocolName, call.param, 
+              value = call(call.connection.protocolName, call.rpcRequest, 
                            call.timestamp);
             } else {
               value = 
@@ -1515,7 +1529,7 @@ public abstract class Server {
                      public Writable run() throws Exception {
                        // make the call
                        return call(call.connection.protocolName, 
-                                   call.param, call.timestamp);
+                                   call.rpcRequest, call.timestamp);
 
                      }
                    }
@@ -1657,7 +1671,7 @@ public abstract class Server {
   throws IOException {
     response.reset();
     DataOutputStream out = new DataOutputStream(response);
-    out.writeInt(call.id);                // write call id
+    out.writeInt(call.callId);                // write call id
     out.writeInt(status.state);           // write status
 
     if (status == Status.SUCCESS) {

+ 2 - 1
hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/WritableRpcEngine.java

@@ -39,6 +39,7 @@ import javax.net.SocketFactory;
 import org.apache.commons.logging.*;
 
 import org.apache.hadoop.io.*;
+import org.apache.hadoop.ipc.RpcPayloadHeader.RpcKind;
 import org.apache.hadoop.ipc.VersionedProtocol;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.security.token.SecretManager;
@@ -242,7 +243,7 @@ public class WritableRpcEngine implements RpcEngine {
       }
 
       ObjectWritable value = (ObjectWritable)
-        client.call(new Invocation(method, args), remoteId);
+        client.call(RpcKind.RPC_WRITABLE, new Invocation(method, args), remoteId);
       if (LOG.isDebugEnabled()) {
         long callTime = System.currentTimeMillis() - startTime;
         LOG.debug("Call: " + method.getName() + " " + callTime);