Explorar el Código

Merged HADOOP-9425

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-2@1483146 13f79535-47bb-0310-9956-ffa450edef68
Sanjay Radia hace 12 años
padre
commit
c74b890261

+ 2 - 0
hadoop-common-project/hadoop-common/CHANGES.txt

@@ -12,6 +12,8 @@ Release 2.0.5-beta - UNRELEASED
 
     HADOOP-9380 Add totalLength to rpc response  (sanjay Radia)
 
+    HADOOP-9425 Add error codes to rpc-response (sanjay Radia)
+
   NEW FEATURES
 
     HADOOP-9194. RPC support for QoS. (Junping Du via llu)

+ 9 - 1
hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Client.java

@@ -65,6 +65,7 @@ import org.apache.hadoop.io.retry.RetryPolicy.RetryAction;
 import org.apache.hadoop.ipc.protobuf.RpcHeaderProtos.RpcRequestHeaderProto;
 import org.apache.hadoop.ipc.protobuf.RpcHeaderProtos.RpcRequestHeaderProto.OperationProto;
 import org.apache.hadoop.ipc.protobuf.RpcHeaderProtos.RpcResponseHeaderProto;
+import org.apache.hadoop.ipc.protobuf.RpcHeaderProtos.RpcResponseHeaderProto.RpcErrorCodeProto;
 import org.apache.hadoop.ipc.protobuf.RpcHeaderProtos.RpcResponseHeaderProto.RpcStatusProto;
 import org.apache.hadoop.net.ConnectTimeoutException;
 import org.apache.hadoop.net.NetUtils;
@@ -985,8 +986,15 @@ public class Client {
                   "ServerDidNotSetExceptionClassName";
           final String errorMsg = header.hasErrorMsg() ? 
                 header.getErrorMsg() : "ServerDidNotSetErrorMsg" ;
+          final RpcErrorCodeProto erCode = 
+                    (header.hasErrorDetail() ? header.getErrorDetail() : null);
+          if (erCode == null) {
+             LOG.warn("Detailed error code not set by server on rpc error");
+          }
           RemoteException re = 
-              new RemoteException(exceptionClassName, errorMsg);
+              ( (erCode == null) ? 
+                  new RemoteException(exceptionClassName, errorMsg) :
+              new RemoteException(exceptionClassName, errorMsg, erCode));
           if (status == RpcStatusProto.ERROR) {
             call.setException(re);
             calls.remove(callId);

+ 6 - 5
hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/ProtobufRpcEngine.java

@@ -437,8 +437,8 @@ public class ProtobufRpcEngine implements RpcEngine {
      */
     static class ProtoBufRpcInvoker implements RpcInvoker {
       private static ProtoClassProtoImpl getProtocolImpl(RPC.Server server,
-          String protoName, long version) throws IOException {
-        ProtoNameVer pv = new ProtoNameVer(protoName, version);
+          String protoName, long clientVersion) throws RpcServerException {
+        ProtoNameVer pv = new ProtoNameVer(protoName, clientVersion);
         ProtoClassProtoImpl impl = 
             server.getProtocolImplMap(RPC.RpcKind.RPC_PROTOCOL_BUFFER).get(pv);
         if (impl == null) { // no match for Protocol AND Version
@@ -446,10 +446,11 @@ public class ProtobufRpcEngine implements RpcEngine {
               server.getHighestSupportedProtocol(RPC.RpcKind.RPC_PROTOCOL_BUFFER, 
                   protoName);
           if (highest == null) {
-            throw new IOException("Unknown protocol: " + protoName);
+            throw new RpcNoSuchProtocolException(
+                "Unknown protocol: " + protoName);
           }
           // protocol supported but not the version that client wants
-          throw new RPC.VersionMismatch(protoName, version,
+          throw new RPC.VersionMismatch(protoName, clientVersion,
               highest.version);
         }
         return impl;
@@ -493,7 +494,7 @@ public class ProtobufRpcEngine implements RpcEngine {
           String msg = "Unknown method " + methodName + " called on " + protocol
               + " protocol.";
           LOG.warn(msg);
-          throw new RpcServerException(msg);
+          throw new RpcNoSuchMethodException(msg);
         }
         Message prototype = service.getRequestPrototype(methodDescriptor);
         Message param = prototype.newBuilderForType()

+ 16 - 1
hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/RPC.java

@@ -43,6 +43,8 @@ import org.apache.hadoop.io.*;
 import org.apache.hadoop.io.retry.RetryPolicy;
 import org.apache.hadoop.ipc.Client.ConnectionId;
 import org.apache.hadoop.ipc.protobuf.ProtocolInfoProtos.ProtocolInfoService;
+import org.apache.hadoop.ipc.protobuf.RpcHeaderProtos.RpcResponseHeaderProto.RpcErrorCodeProto;
+import org.apache.hadoop.ipc.protobuf.RpcHeaderProtos.RpcResponseHeaderProto.RpcStatusProto;
 import org.apache.hadoop.net.NetUtils;
 import org.apache.hadoop.security.SaslRpcServer;
 import org.apache.hadoop.security.UserGroupInformation;
@@ -209,7 +211,7 @@ public class RPC {
   /**
    * A version mismatch for the RPC protocol.
    */
-  public static class VersionMismatch extends IOException {
+  public static class VersionMismatch extends RpcServerException {
     private static final long serialVersionUID = 0;
 
     private String interfaceName;
@@ -253,6 +255,19 @@ public class RPC {
     public long getServerVersion() {
       return serverVersion;
     }
+    /**
+     * get the rpc status corresponding to this exception
+     */
+    public RpcStatusProto getRpcStatusProto() {
+      return RpcStatusProto.ERROR;
+    }
+
+    /**
+     * get the detailed rpc status corresponding to this exception
+     */
+    public RpcErrorCodeProto getRpcErrorCodeProto() {
+      return RpcErrorCodeProto.ERROR_RPC_VERSION_MISMATCH;
+    }
   }
 
   /**

+ 16 - 0
hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/RemoteException.java

@@ -21,22 +21,38 @@ package org.apache.hadoop.ipc;
 import java.io.IOException;
 import java.lang.reflect.Constructor;
 
+import org.apache.hadoop.ipc.protobuf.RpcHeaderProtos.RpcResponseHeaderProto.RpcErrorCodeProto;
 import org.xml.sax.Attributes;
 
 public class RemoteException extends IOException {
   /** For java.io.Serializable */
   private static final long serialVersionUID = 1L;
+  private final int errorCode;
 
   private String className;
   
   public RemoteException(String className, String msg) {
     super(msg);
     this.className = className;
+    errorCode = -1;
+  }
+  
+  public RemoteException(String className, String msg, RpcErrorCodeProto erCode) {
+    super(msg);
+    this.className = className;
+    if (erCode != null)
+      errorCode = erCode.getNumber();
+    else 
+      errorCode = -1;
   }
   
   public String getClassName() {
     return className;
   }
+  
+  public RpcErrorCodeProto getErrorCode() {
+    return RpcErrorCodeProto.valueOf(errorCode);
+  }
 
   /**
    * If this remote exception wraps up one of the lookupTypes

+ 47 - 0
hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/RpcNoSuchMethodException.java

@@ -0,0 +1,47 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.ipc;
+
+import org.apache.hadoop.ipc.protobuf.RpcHeaderProtos.RpcResponseHeaderProto.RpcErrorCodeProto;
+import org.apache.hadoop.ipc.protobuf.RpcHeaderProtos.RpcResponseHeaderProto.RpcStatusProto;
+
+
+/**
+ * No such Method for an Rpc Call
+ *
+ */
+public class RpcNoSuchMethodException extends RpcServerException {
+  private static final long serialVersionUID = 1L;
+  public RpcNoSuchMethodException(final String message) {
+    super(message);
+  }
+  
+  /**
+   * get the rpc status corresponding to this exception
+   */
+  public RpcStatusProto getRpcStatusProto() {
+    return RpcStatusProto.ERROR;
+  }
+
+  /**
+   * get the detailed rpc status corresponding to this exception
+   */
+  public RpcErrorCodeProto getRpcErrorCodeProto() {
+    return RpcErrorCodeProto.ERROR_NO_SUCH_METHOD;
+  }
+}

+ 46 - 0
hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/RpcNoSuchProtocolException.java

@@ -0,0 +1,46 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.ipc;
+
+import org.apache.hadoop.ipc.protobuf.RpcHeaderProtos.RpcResponseHeaderProto.RpcErrorCodeProto;
+import org.apache.hadoop.ipc.protobuf.RpcHeaderProtos.RpcResponseHeaderProto.RpcStatusProto;
+
+/**
+ * No such protocol (i.e. interface) for and Rpc Call
+ *
+ */
+public class RpcNoSuchProtocolException extends RpcServerException {
+  private static final long serialVersionUID = 1L;
+  public RpcNoSuchProtocolException(final String message) {
+    super(message);
+  }
+  
+  /**
+   * get the rpc status corresponding to this exception
+   */
+  public RpcStatusProto getRpcStatusProto() {
+    return RpcStatusProto.ERROR;
+  }
+
+  /**
+   * get the detailed rpc status corresponding to this exception
+   */
+  public RpcErrorCodeProto getRpcErrorCodeProto() {
+    return RpcErrorCodeProto.ERROR_NO_SUCH_PROTOCOL;
+  }
+}

+ 17 - 0
hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/RpcServerException.java

@@ -17,6 +17,9 @@
  */
 package org.apache.hadoop.ipc;
 
+import org.apache.hadoop.ipc.protobuf.RpcHeaderProtos.RpcResponseHeaderProto.RpcErrorCodeProto;
+import org.apache.hadoop.ipc.protobuf.RpcHeaderProtos.RpcResponseHeaderProto.RpcStatusProto;
+
 /**
  * Indicates an exception on the RPC server 
  */
@@ -42,4 +45,18 @@ public class RpcServerException extends RpcException {
   public RpcServerException(final String message, final Throwable cause) {
     super(message, cause);
   }
+  
+  /**
+   * get the rpc status corresponding to this exception
+   */
+  public RpcStatusProto getRpcStatusProto() {
+    return RpcStatusProto.ERROR;
+  }
+
+  /**
+   * get the detailed rpc status corresponding to this exception
+   */
+  public RpcErrorCodeProto getRpcErrorCodeProto() {
+    return RpcErrorCodeProto.ERROR_RPC_SERVER;
+  }
 }

+ 75 - 29
hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java

@@ -82,6 +82,7 @@ import org.apache.hadoop.ipc.metrics.RpcDetailedMetrics;
 import org.apache.hadoop.ipc.metrics.RpcMetrics;
 import org.apache.hadoop.ipc.protobuf.IpcConnectionContextProtos.IpcConnectionContextProto;
 import org.apache.hadoop.ipc.protobuf.RpcHeaderProtos.RpcResponseHeaderProto.RpcStatusProto;
+import org.apache.hadoop.ipc.protobuf.RpcHeaderProtos.RpcResponseHeaderProto.RpcErrorCodeProto;
 import org.apache.hadoop.ipc.protobuf.RpcHeaderProtos.*;
 import org.apache.hadoop.net.NetUtils;
 import org.apache.hadoop.security.AccessControlException;
@@ -1435,7 +1436,8 @@ public abstract class Server {
         final String ioeMessage  = ioe.getLocalizedMessage();
         if (authMethod == AuthMethod.SIMPLE) {
           setupResponse(authFailedResponse, authFailedCall,
-              RpcStatusProto.FATAL, null, ioeClass, ioeMessage);
+              RpcStatusProto.FATAL, RpcErrorCodeProto.FATAL_UNAUTHORIZED, 
+              null, ioeClass, ioeMessage);
           responder.doRespond(authFailedCall);
         } else {
           doSaslReply(SaslStatus.ERROR, null, ioeClass, ioeMessage);
@@ -1525,7 +1527,8 @@ public abstract class Server {
       if (clientVersion >= 9) {
         // Versions >>9  understand the normal response
         Call fakeCall =  new Call(-1, null, this);
-        setupResponse(buffer, fakeCall, RpcStatusProto.FATAL,
+        setupResponse(buffer, fakeCall, 
+            RpcStatusProto.FATAL, RpcErrorCodeProto.FATAL_VERSION_MISMATCH,
             null, VersionMismatch.class.getName(), errMsg);
         responder.doRespond(fakeCall);
       } else if (clientVersion >= 3) {
@@ -1554,8 +1557,9 @@ public abstract class Server {
       ByteArrayOutputStream buffer = new ByteArrayOutputStream();
 
       Call fakeCall = new Call(-1, null, this);
-      setupResponse(buffer, fakeCall, RpcStatusProto.FATAL, null,
-          IpcException.class.getName(), errMsg);
+      setupResponse(buffer, fakeCall, 
+       RpcStatusProto.FATAL,  RpcErrorCodeProto.FATAL_UNSUPPORTED_SERIALIZATION,
+       null, IpcException.class.getName(), errMsg);
       responder.doRespond(fakeCall);
     }
     
@@ -1644,7 +1648,7 @@ public abstract class Server {
     private void processOneRpc(byte[] buf) throws IOException,
         InterruptedException {
       if (connectionContextRead) {
-        processData(buf);
+        processRpcRequest(buf);
       } else {
         processConnectionContext(buf);
         connectionContextRead = true;
@@ -1656,7 +1660,17 @@ public abstract class Server {
       }
     }
     
-    private void processData(byte[] buf) throws  IOException, InterruptedException {
+    /**
+     * Process an RPC Request - the connection headers and context have been
+     * read
+     * @param buf - contains the RPC request header and the rpc request
+     * @throws RpcServerException due to fatal rpc layer issues such as
+     *   invalid header. In this case a RPC fatal status response is sent back
+     *   to client.
+     */
+    
+    private void processRpcRequest(byte[] buf) 
+        throws  RpcServerException, IOException, InterruptedException {
       DataInputStream dis =
         new DataInputStream(new ByteArrayInputStream(buf));
       RpcRequestHeaderProto header = RpcRequestHeaderProto.parseDelimitedFrom(dis);
@@ -1664,51 +1678,58 @@ public abstract class Server {
       if (LOG.isDebugEnabled())
         LOG.debug(" got #" + header.getCallId());
       if (!header.hasRpcOp()) {
-        throw new IOException(" IPC Server: No rpc op in rpcRequestHeader");
+        String err = " IPC Server: No rpc op in rpcRequestHeader";
+        respondBadRpcHeader(new Call(header.getCallId(), null, this),
+            RpcServerException.class.getName(), err);
+        throw new RpcServerException(err);
       }
       if (header.getRpcOp() != 
           RpcRequestHeaderProto.OperationProto.RPC_FINAL_PACKET) {
-        throw new IOException("IPC Server does not implement operation" + 
-              header.getRpcOp());
+        String err = "IPC Server does not implement rpc header operation" + 
+                header.getRpcOp();
+        respondBadRpcHeader(new Call(header.getCallId(), null, this),
+            RpcServerException.class.getName(), err);
+        throw new RpcServerException(err);
       }
       // If we know the rpc kind, get its class so that we can deserialize
       // (Note it would make more sense to have the handler deserialize but 
       // we continue with this original design.
       if (!header.hasRpcKind()) {
-        throw new IOException(" IPC Server: No rpc kind in rpcRequestHeader");
+        String err = " IPC Server: No rpc kind in rpcRequestHeader";
+        respondBadRpcHeader(new Call(header.getCallId(), null, this),
+            RpcServerException.class.getName(), err);
+        throw new RpcServerException(err);
       }
       Class<? extends Writable> rpcRequestClass = 
           getRpcRequestWrapper(header.getRpcKind());
       if (rpcRequestClass == null) {
         LOG.warn("Unknown rpc kind "  + header.getRpcKind() + 
             " from client " + getHostAddress());
-        final Call readParamsFailedCall = 
-            new Call(header.getCallId(), null, this);
-        ByteArrayOutputStream responseBuffer = new ByteArrayOutputStream();
-
-        setupResponse(responseBuffer, readParamsFailedCall, RpcStatusProto.FATAL, null,
-            IOException.class.getName(),
-            "Unknown rpc kind "  + header.getRpcKind());
-        responder.doRespond(readParamsFailedCall);
-        return;   
+        final String err = "Unknown rpc kind in rpc header"  + 
+            header.getRpcKind();
+        respondBadRpcHeader(new Call(header.getCallId(), null, this),
+            RpcServerException.class.getName(), err);
+        throw new RpcServerException(err);   
       }
       Writable rpcRequest;
       try { //Read the rpc request
         rpcRequest = ReflectionUtils.newInstance(rpcRequestClass, conf);
         rpcRequest.readFields(dis);
-      } catch (Throwable t) {
+      } catch (Throwable t) { // includes runtime exception from newInstance
         LOG.warn("Unable to read call parameters for client " +
                  getHostAddress() + "on connection protocol " +
             this.protocolName + " for rpcKind " + header.getRpcKind(),  t);
         final Call readParamsFailedCall = 
             new Call(header.getCallId(), null, this);
         ByteArrayOutputStream responseBuffer = new ByteArrayOutputStream();
+        String err = "IPC server unable to read call parameters: "+ t.getMessage();
 
-        setupResponse(responseBuffer, readParamsFailedCall, RpcStatusProto.FATAL, null,
-            t.getClass().getName(),
-            "IPC server unable to read call parameters: " + t.getMessage());
+        setupResponse(responseBuffer, readParamsFailedCall, 
+            RpcStatusProto.FATAL, RpcErrorCodeProto.FATAL_DESERIALIZING_REQUEST,
+            null, t.getClass().getName(),
+            err);
         responder.doRespond(readParamsFailedCall);
-        return;
+        throw new RpcServerException(err, t);
       }
         
       Call call = new Call(header.getCallId(), rpcRequest, this, 
@@ -1734,7 +1755,8 @@ public abstract class Server {
         rpcMetrics.incrAuthorizationSuccesses();
       } catch (AuthorizationException ae) {
         rpcMetrics.incrAuthorizationFailures();
-        setupResponse(authFailedResponse, authFailedCall, RpcStatusProto.FATAL, null,
+        setupResponse(authFailedResponse, authFailedCall, 
+            RpcStatusProto.FATAL,  RpcErrorCodeProto.FATAL_UNAUTHORIZED, null,
             ae.getClass().getName(), ae.getMessage());
         responder.doRespond(authFailedCall);
         return false;
@@ -1796,6 +1818,8 @@ public abstract class Server {
           }
           String errorClass = null;
           String error = null;
+          RpcStatusProto returnStatus = RpcStatusProto.SUCCESS;
+          RpcErrorCodeProto detailedErr = null;
           Writable value = null;
 
           CurCall.set(call);
@@ -1836,7 +1860,14 @@ public abstract class Server {
             } else {
               LOG.info(logMsg, e);
             }
-
+            if (e instanceof RpcServerException) {
+              RpcServerException rse = ((RpcServerException)e); 
+              returnStatus = rse.getRpcStatusProto();
+              detailedErr = rse.getRpcErrorCodeProto();
+            } else {
+              returnStatus = RpcStatusProto.ERROR;
+              detailedErr = RpcErrorCodeProto.ERROR_APPLICATION;
+            }
             errorClass = e.getClass().getName();
             error = StringUtils.stringifyException(e);
             // Remove redundant error class name from the beginning of the stack trace
@@ -1851,8 +1882,8 @@ public abstract class Server {
             // responder.doResponse() since setupResponse may use
             // SASL to encrypt response data and SASL enforces
             // its own message ordering.
-            setupResponse(buf, call, (error == null) ? RpcStatusProto.SUCCESS
-                : RpcStatusProto.ERROR, value, errorClass, error);
+            setupResponse(buf, call, returnStatus, detailedErr, 
+                value, errorClass, error);
             
             // Discard the large buf and reset it back to smaller size 
             // to free up heap
@@ -2023,7 +2054,7 @@ public abstract class Server {
    * @throws IOException
    */
   private void setupResponse(ByteArrayOutputStream responseBuf,
-                             Call call, RpcStatusProto status, 
+                             Call call, RpcStatusProto status, RpcErrorCodeProto erCode,
                              Writable rv, String errorClass, String error) 
   throws IOException {
     responseBuf.reset();
@@ -2062,6 +2093,7 @@ public abstract class Server {
         // buffer is reset at the top, and since status is changed
         // to ERROR it won't infinite loop.
         setupResponse(responseBuf, call, RpcStatusProto.ERROR,
+            RpcErrorCodeProto.ERROR_SERIALIZING_RESPONSE,
             null, t.getClass().getName(),
             StringUtils.stringifyException(t));
         return;
@@ -2069,6 +2101,7 @@ public abstract class Server {
     } else { // Rpc Failure
       headerBuilder.setExceptionClassName(errorClass);
       headerBuilder.setErrorMsg(error);
+      headerBuilder.setErrorDetail(erCode);
       RpcResponseHeaderProto header = headerBuilder.build();
       int headerLen = header.getSerializedSize();
       final int fullLength  = 
@@ -2113,6 +2146,19 @@ public abstract class Server {
     call.setResponse(ByteBuffer.wrap(response.toByteArray()));
   }
   
+  
+  private void respondBadRpcHeader(Call call, String errorClass, String error)
+      throws IOException
+  {
+    ByteArrayOutputStream responseBuf = new ByteArrayOutputStream();
+    setupResponse(responseBuf, call, 
+        RpcStatusProto.FATAL,  RpcErrorCodeProto.FATAL_INVALID_RPC_HEADER,
+        null, errorClass, error);
+    responder.doRespond(call);
+    return; 
+    
+  }
+  
   private void wrapWithSasl(ByteArrayOutputStream response, Call call)
       throws IOException {
     if (call.connection.saslServer != null) {

+ 49 - 49
hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/WritableRpcEngine.java

@@ -416,62 +416,62 @@ public class WritableRpcEngine implements RpcEngine {
      @Override
       public Writable call(org.apache.hadoop.ipc.RPC.Server server,
           String protocolName, Writable rpcRequest, long receivedTime)
-          throws IOException {
-        try {
-          Invocation call = (Invocation)rpcRequest;
-          if (server.verbose) log("Call: " + call);
-
-          // Verify rpc version
-          if (call.getRpcVersion() != writableRpcVersion) {
-            // Client is using a different version of WritableRpc
-            throw new IOException(
-                "WritableRpc version mismatch, client side version="
-                    + call.getRpcVersion() + ", server side version="
-                    + writableRpcVersion);
-          }
+          throws IOException, RPC.VersionMismatch {
+
+        Invocation call = (Invocation)rpcRequest;
+        if (server.verbose) log("Call: " + call);
+
+        // Verify writable rpc version
+        if (call.getRpcVersion() != writableRpcVersion) {
+          // Client is using a different version of WritableRpc
+          throw new RpcServerException(
+              "WritableRpc version mismatch, client side version="
+                  + call.getRpcVersion() + ", server side version="
+                  + writableRpcVersion);
+        }
 
-          long clientVersion = call.getProtocolVersion();
-          final String protoName;
-          ProtoClassProtoImpl protocolImpl;
-          if (call.declaringClassProtocolName.equals(VersionedProtocol.class.getName())) {
-            // VersionProtocol methods are often used by client to figure out
-            // which version of protocol to use.
-            //
-            // Versioned protocol methods should go the protocolName protocol
-            // rather than the declaring class of the method since the
-            // the declaring class is VersionedProtocol which is not 
-            // registered directly.
-            // Send the call to the highest  protocol version
-            VerProtocolImpl highest = server.getHighestSupportedProtocol(
-                RPC.RpcKind.RPC_WRITABLE, protocolName);
+        long clientVersion = call.getProtocolVersion();
+        final String protoName;
+        ProtoClassProtoImpl protocolImpl;
+        if (call.declaringClassProtocolName.equals(VersionedProtocol.class.getName())) {
+          // VersionProtocol methods are often used by client to figure out
+          // which version of protocol to use.
+          //
+          // Versioned protocol methods should go the protocolName protocol
+          // rather than the declaring class of the method since the
+          // the declaring class is VersionedProtocol which is not 
+          // registered directly.
+          // Send the call to the highest  protocol version
+          VerProtocolImpl highest = server.getHighestSupportedProtocol(
+              RPC.RpcKind.RPC_WRITABLE, protocolName);
+          if (highest == null) {
+            throw new RpcServerException("Unknown protocol: " + protocolName);
+          }
+          protocolImpl = highest.protocolTarget;
+        } else {
+          protoName = call.declaringClassProtocolName;
+
+          // Find the right impl for the protocol based on client version.
+          ProtoNameVer pv = 
+              new ProtoNameVer(call.declaringClassProtocolName, clientVersion);
+          protocolImpl = 
+              server.getProtocolImplMap(RPC.RpcKind.RPC_WRITABLE).get(pv);
+          if (protocolImpl == null) { // no match for Protocol AND Version
+             VerProtocolImpl highest = 
+                 server.getHighestSupportedProtocol(RPC.RpcKind.RPC_WRITABLE, 
+                     protoName);
             if (highest == null) {
-              throw new IOException("Unknown protocol: " + protocolName);
-            }
-            protocolImpl = highest.protocolTarget;
-          } else {
-            protoName = call.declaringClassProtocolName;
-
-            // Find the right impl for the protocol based on client version.
-            ProtoNameVer pv = 
-                new ProtoNameVer(call.declaringClassProtocolName, clientVersion);
-            protocolImpl = 
-                server.getProtocolImplMap(RPC.RpcKind.RPC_WRITABLE).get(pv);
-            if (protocolImpl == null) { // no match for Protocol AND Version
-               VerProtocolImpl highest = 
-                   server.getHighestSupportedProtocol(RPC.RpcKind.RPC_WRITABLE, 
-                       protoName);
-              if (highest == null) {
-                throw new IOException("Unknown protocol: " + protoName);
-              } else { // protocol supported but not the version that client wants
-                throw new RPC.VersionMismatch(protoName, clientVersion,
-                  highest.version);
-              }
+              throw new RpcServerException("Unknown protocol: " + protoName);
+            } else { // protocol supported but not the version that client wants
+              throw new RPC.VersionMismatch(protoName, clientVersion,
+                highest.version);
             }
           }
+        }
           
 
           // Invoke the protocol method
-
+       try {
           long startTime = Time.now();
           Method method = 
               protocolImpl.protocolClass.getMethod(call.getMethodName(),

+ 1 - 1
hadoop-common-project/hadoop-common/src/main/proto/ProtobufRpcEngine.proto

@@ -1,4 +1,4 @@
-/**DER
+/**
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information

+ 34 - 5
hadoop-common-project/hadoop-common/src/main/proto/RpcHeader.proto

@@ -62,27 +62,55 @@ message RpcRequestHeaderProto { // the header for the RpcRequest
 
 /**
  * Rpc Response Header
- * ** If request is successfull response is returned as below ********
  * +------------------------------------------------------------------+
- * | Rpc reponse length in bytes (4 bytes int)                        |
+ * | Rpc total response length in bytes (4 bytes int)                 |
  * |  (sum of next two parts)                                         |
  * +------------------------------------------------------------------+
  * | RpcResponseHeaderProto - serialized delimited ie has len         |
  * +------------------------------------------------------------------+
  * | if request is successful:                                        |
  * |   - RpcResponse -  The actual rpc response  bytes follow         |
- *       the response header                                          |
+ * |     the response header                                          |
  * |     This response is serialized based on RpcKindProto            |
  * | if request fails :                                               |
  * |   The rpc response header contains the necessary info            |
  * +------------------------------------------------------------------+
  *
+ * Note that rpc response header is also used when connection setup fails. 
+ * Ie the response looks like a rpc response with a fake callId.
  */
 message RpcResponseHeaderProto {
+  /**
+    * 
+    * RpcStastus - success or failure
+    * The reponseHeader's errDetail,  exceptionClassName and errMsg contains
+    * further details on the error
+    **/
+
   enum RpcStatusProto {
    SUCCESS = 0;  // RPC succeeded
-   ERROR = 1;    // RPC Failed
-   FATAL = 2;    // Fatal error - connection is closed
+   ERROR = 1;    // RPC or error - connection left open for future calls
+   FATAL = 2;    // Fatal error - connection closed
+  }
+
+  enum RpcErrorCodeProto {
+
+   // Non-fatal Rpc error - connection left open for future rpc calls
+   ERROR_APPLICATION = 1;      // RPC Failed - rpc app threw exception
+   ERROR_NO_SUCH_METHOD = 2;   // Rpc error - no such method
+   ERROR_NO_SUCH_PROTOCOL = 3; // Rpc error - no such protocol
+   ERROR_RPC_SERVER  = 4;      // Rpc error on server side
+   ERROR_SERIALIZING_RESPONSE = 5; // error serializign response
+   ERROR_RPC_VERSION_MISMATCH = 6; // Rpc protocol version mismatch
+
+
+   // Fatal Server side Rpc error - connection closed
+   FATAL_UNKNOWN = 10;                   // unknown Fatal error
+   FATAL_UNSUPPORTED_SERIALIZATION = 11; // IPC layer serilization type invalid
+   FATAL_INVALID_RPC_HEADER = 12;        // fields of RpcHeader are invalid
+   FATAL_DESERIALIZING_REQUEST = 13;     // could not deserilize rpc request
+   FATAL_VERSION_MISMATCH = 14;          // Ipc Layer version mismatch
+   FATAL_UNAUTHORIZED = 15;              // Auth failed
   }
 
   required uint32 callId = 1; // callId used in Request
@@ -90,4 +118,5 @@ message RpcResponseHeaderProto {
   optional uint32 serverIpcVersionNum = 3; // Sent if success or fail
   optional string exceptionClassName = 4;  // if request fails
   optional string errorMsg = 5;  // if request fails, often contains strack trace
+  optional RpcErrorCodeProto errorDetail = 6; // in case of error
 }

+ 5 - 0
hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestProtoBufRpc.java

@@ -25,6 +25,7 @@ import java.net.InetSocketAddress;
 import java.net.URISyntaxException;
 
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.ipc.protobuf.RpcHeaderProtos.RpcResponseHeaderProto.RpcErrorCodeProto;
 import org.apache.hadoop.ipc.protobuf.TestProtos.EchoRequestProto;
 import org.apache.hadoop.ipc.protobuf.TestProtos.EchoResponseProto;
 import org.apache.hadoop.ipc.protobuf.TestProtos.EmptyRequestProto;
@@ -183,6 +184,8 @@ public class TestProtoBufRpc {
       RemoteException re = (RemoteException)e.getCause();
       RpcServerException rse = (RpcServerException) re
           .unwrapRemoteException(RpcServerException.class);
+      Assert.assertTrue(re.getErrorCode().equals(
+          RpcErrorCodeProto.ERROR_RPC_SERVER));
     }
   }
   
@@ -223,6 +226,8 @@ public class TestProtoBufRpc {
       Assert.assertTrue(re.getClassName().equals(
           URISyntaxException.class.getName()));
       Assert.assertTrue(re.getMessage().contains("testException"));
+      Assert.assertTrue(
+          re.getErrorCode().equals(RpcErrorCodeProto.ERROR_APPLICATION));
     }
   }
 }

+ 8 - 3
hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestRPCCompatibility.java

@@ -34,6 +34,7 @@ import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.ipc.protobuf.ProtocolInfoProtos.GetProtocolSignatureRequestProto;
 import org.apache.hadoop.ipc.protobuf.ProtocolInfoProtos.GetProtocolSignatureResponseProto;
 import org.apache.hadoop.ipc.protobuf.ProtocolInfoProtos.ProtocolSignatureProto;
+import org.apache.hadoop.ipc.protobuf.RpcHeaderProtos.RpcResponseHeaderProto.RpcErrorCodeProto;
 import org.apache.hadoop.net.NetUtils;
 import org.junit.After;
 import org.junit.Before;
@@ -310,9 +311,13 @@ System.out.println("echo int is NOT supported");
     try {
       proxy.echo(21);
       fail("The call must throw VersionMismatch exception");
-    } catch (IOException ex) {
-      Assert.assertTrue("Expected version mismatch but got " + ex.getMessage(), 
-          ex.getMessage().contains("VersionMismatch"));
+    } catch (RemoteException ex) {
+      Assert.assertEquals(RPC.VersionMismatch.class.getName(), 
+          ex.getClassName());
+      Assert.assertTrue(ex.getErrorCode().equals(
+          RpcErrorCodeProto.ERROR_RPC_VERSION_MISMATCH));
+    }  catch (IOException ex) {
+      fail("Expected version mismatch but got " + ex);
     }
   }