Browse Source

HADOOP-10940. RPC client does no bounds checking of responses. Contributed by Daryn Sharp.

Kihwal Lee 8 years ago
parent
commit
d4d076876a

+ 2 - 8
hadoop-common-project/hadoop-common/dev-support/findbugsExcludeFile.xml

@@ -44,7 +44,7 @@
      --> 
      --> 
      <Match>
      <Match>
        <Class name="org.apache.hadoop.ipc.Client$Connection" />
        <Class name="org.apache.hadoop.ipc.Client$Connection" />
-       <Field name="out" />
+       <Field name="ipcStreams" />
        <Bug pattern="IS2_INCONSISTENT_SYNC" />
        <Bug pattern="IS2_INCONSISTENT_SYNC" />
      </Match>
      </Match>
     <!--
     <!--
@@ -341,13 +341,7 @@
        <Method name="removeRenewAction" />
        <Method name="removeRenewAction" />
        <Bug pattern="BC_UNCONFIRMED_CAST" />
        <Bug pattern="BC_UNCONFIRMED_CAST" />
      </Match>
      </Match>
-     
-     <!-- Inconsistent synchronization flagged by findbugs is not valid. -->
-     <Match>
-       <Class name="org.apache.hadoop.ipc.Client$Connection" />
-       <Field name="in" />
-       <Bug pattern="IS2_INCONSISTENT_SYNC" />
-     </Match>
+
      <!-- 
      <!-- 
        The switch condition for INITIATE is expected to fallthru to RESPONSE
        The switch condition for INITIATE is expected to fallthru to RESPONSE
        to process initial sasl response token included in the INITIATE
        to process initial sasl response token included in the INITIATE

+ 10 - 2
hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/CommonConfigurationKeys.java

@@ -78,12 +78,20 @@ public class CommonConfigurationKeys extends CommonConfigurationKeysPublic {
   /** Default value for IPC_SERVER_RPC_READ_CONNECTION_QUEUE_SIZE */
   /** Default value for IPC_SERVER_RPC_READ_CONNECTION_QUEUE_SIZE */
   public static final int IPC_SERVER_RPC_READ_CONNECTION_QUEUE_SIZE_DEFAULT =
   public static final int IPC_SERVER_RPC_READ_CONNECTION_QUEUE_SIZE_DEFAULT =
       100;
       100;
-      
+
+  /** Max request size a server will accept. */
   public static final String IPC_MAXIMUM_DATA_LENGTH =
   public static final String IPC_MAXIMUM_DATA_LENGTH =
       "ipc.maximum.data.length";
       "ipc.maximum.data.length";
-  
+  /** Default value for IPC_MAXIMUM_DATA_LENGTH. */
   public static final int IPC_MAXIMUM_DATA_LENGTH_DEFAULT = 64 * 1024 * 1024;
   public static final int IPC_MAXIMUM_DATA_LENGTH_DEFAULT = 64 * 1024 * 1024;
 
 
+  /** Max response size a client will accept. */
+  public static final String IPC_MAXIMUM_RESPONSE_LENGTH =
+      "ipc.maximum.response.length";
+  /** Default value for IPC_MAXIMUM_RESPONSE_LENGTH. */
+  public static final int IPC_MAXIMUM_RESPONSE_LENGTH_DEFAULT =
+      128 * 1024 * 1024;
+
   /** How many calls per handler are allowed in the queue. */
   /** How many calls per handler are allowed in the queue. */
   public static final String  IPC_SERVER_HANDLER_QUEUE_SIZE_KEY =
   public static final String  IPC_SERVER_HANDLER_QUEUE_SIZE_KEY =
     "ipc.server.handler.queue.size";
     "ipc.server.handler.queue.size";

+ 128 - 50
hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Client.java

@@ -32,6 +32,7 @@ import org.apache.hadoop.fs.CommonConfigurationKeys;
 import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
 import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
 import org.apache.hadoop.io.IOUtils;
 import org.apache.hadoop.io.IOUtils;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableUtils;
 import org.apache.hadoop.io.retry.RetryPolicies;
 import org.apache.hadoop.io.retry.RetryPolicies;
 import org.apache.hadoop.io.retry.RetryPolicy;
 import org.apache.hadoop.io.retry.RetryPolicy;
 import org.apache.hadoop.io.retry.RetryPolicy.RetryAction;
 import org.apache.hadoop.io.retry.RetryPolicy.RetryAction;
@@ -413,8 +414,8 @@ public class Client implements AutoCloseable {
     private SaslRpcClient saslRpcClient;
     private SaslRpcClient saslRpcClient;
     
     
     private Socket socket = null;                 // connected socket
     private Socket socket = null;                 // connected socket
-    private DataInputStream in;
-    private DataOutputStream out;
+    private IpcStreams ipcStreams;
+    private final int maxResponseLength;
     private final int rpcTimeout;
     private final int rpcTimeout;
     private int maxIdleTime; //connections will be culled if it was idle for 
     private int maxIdleTime; //connections will be culled if it was idle for 
     //maxIdleTime msecs
     //maxIdleTime msecs
@@ -426,8 +427,8 @@ public class Client implements AutoCloseable {
     private final boolean doPing; //do we need to send ping message
     private final boolean doPing; //do we need to send ping message
     private final int pingInterval; // how often sends ping to the server
     private final int pingInterval; // how often sends ping to the server
     private final int soTimeout; // used by ipc ping and rpc timeout
     private final int soTimeout; // used by ipc ping and rpc timeout
-    private ResponseBuffer pingRequest; // ping message
-    
+    private byte[] pingRequest; // ping message
+
     // currently active calls
     // currently active calls
     private Hashtable<Integer, Call> calls = new Hashtable<Integer, Call>();
     private Hashtable<Integer, Call> calls = new Hashtable<Integer, Call>();
     private AtomicLong lastActivity = new AtomicLong();// last I/O activity time
     private AtomicLong lastActivity = new AtomicLong();// last I/O activity time
@@ -446,6 +447,9 @@ public class Client implements AutoCloseable {
             0,
             0,
             new UnknownHostException());
             new UnknownHostException());
       }
       }
+      this.maxResponseLength = remoteId.conf.getInt(
+          CommonConfigurationKeys.IPC_MAXIMUM_RESPONSE_LENGTH,
+          CommonConfigurationKeys.IPC_MAXIMUM_RESPONSE_LENGTH_DEFAULT);
       this.rpcTimeout = remoteId.getRpcTimeout();
       this.rpcTimeout = remoteId.getRpcTimeout();
       this.maxIdleTime = remoteId.getMaxIdleTime();
       this.maxIdleTime = remoteId.getMaxIdleTime();
       this.connectionRetryPolicy = remoteId.connectionRetryPolicy;
       this.connectionRetryPolicy = remoteId.connectionRetryPolicy;
@@ -456,12 +460,13 @@ public class Client implements AutoCloseable {
       this.doPing = remoteId.getDoPing();
       this.doPing = remoteId.getDoPing();
       if (doPing) {
       if (doPing) {
         // construct a RPC header with the callId as the ping callId
         // construct a RPC header with the callId as the ping callId
-        pingRequest = new ResponseBuffer();
+        ResponseBuffer buf = new ResponseBuffer();
         RpcRequestHeaderProto pingHeader = ProtoUtil
         RpcRequestHeaderProto pingHeader = ProtoUtil
             .makeRpcRequestHeader(RpcKind.RPC_PROTOCOL_BUFFER,
             .makeRpcRequestHeader(RpcKind.RPC_PROTOCOL_BUFFER,
                 OperationProto.RPC_FINAL_PACKET, PING_CALL_ID,
                 OperationProto.RPC_FINAL_PACKET, PING_CALL_ID,
                 RpcConstants.INVALID_RETRY_COUNT, clientId);
                 RpcConstants.INVALID_RETRY_COUNT, clientId);
-        pingHeader.writeDelimitedTo(pingRequest);
+        pingHeader.writeDelimitedTo(buf);
+        pingRequest = buf.toByteArray();
       }
       }
       this.pingInterval = remoteId.getPingInterval();
       this.pingInterval = remoteId.getPingInterval();
       if (rpcTimeout > 0) {
       if (rpcTimeout > 0) {
@@ -596,15 +601,15 @@ public class Client implements AutoCloseable {
       }
       }
       return false;
       return false;
     }
     }
-    
-    private synchronized AuthMethod setupSaslConnection(final InputStream in2, 
-        final OutputStream out2) throws IOException {
+
+    private synchronized AuthMethod setupSaslConnection(IpcStreams streams)
+        throws IOException {
       // Do not use Client.conf here! We must use ConnectionId.conf, since the
       // Do not use Client.conf here! We must use ConnectionId.conf, since the
       // Client object is cached and shared between all RPC clients, even those
       // Client object is cached and shared between all RPC clients, even those
       // for separate services.
       // for separate services.
       saslRpcClient = new SaslRpcClient(remoteId.getTicket(),
       saslRpcClient = new SaslRpcClient(remoteId.getTicket(),
           remoteId.getProtocol(), remoteId.getAddress(), remoteId.conf);
           remoteId.getProtocol(), remoteId.getAddress(), remoteId.conf);
-      return saslRpcClient.saslConnect(in2, out2);
+      return saslRpcClient.saslConnect(streams);
     }
     }
 
 
     /**
     /**
@@ -770,12 +775,9 @@ public class Client implements AutoCloseable {
         Random rand = null;
         Random rand = null;
         while (true) {
         while (true) {
           setupConnection();
           setupConnection();
-          InputStream inStream = NetUtils.getInputStream(socket);
-          OutputStream outStream = NetUtils.getOutputStream(socket);
-          writeConnectionHeader(outStream);
+          ipcStreams = new IpcStreams(socket, maxResponseLength);
+          writeConnectionHeader(ipcStreams);
           if (authProtocol == AuthProtocol.SASL) {
           if (authProtocol == AuthProtocol.SASL) {
-            final InputStream in2 = inStream;
-            final OutputStream out2 = outStream;
             UserGroupInformation ticket = remoteId.getTicket();
             UserGroupInformation ticket = remoteId.getTicket();
             if (ticket.getRealUser() != null) {
             if (ticket.getRealUser() != null) {
               ticket = ticket.getRealUser();
               ticket = ticket.getRealUser();
@@ -786,7 +788,7 @@ public class Client implements AutoCloseable {
                     @Override
                     @Override
                     public AuthMethod run()
                     public AuthMethod run()
                         throws IOException, InterruptedException {
                         throws IOException, InterruptedException {
-                      return setupSaslConnection(in2, out2);
+                      return setupSaslConnection(ipcStreams);
                     }
                     }
                   });
                   });
             } catch (IOException ex) {
             } catch (IOException ex) {
@@ -805,8 +807,7 @@ public class Client implements AutoCloseable {
             }
             }
             if (authMethod != AuthMethod.SIMPLE) {
             if (authMethod != AuthMethod.SIMPLE) {
               // Sasl connect is successful. Let's set up Sasl i/o streams.
               // Sasl connect is successful. Let's set up Sasl i/o streams.
-              inStream = saslRpcClient.getInputStream(inStream);
-              outStream = saslRpcClient.getOutputStream(outStream);
+              ipcStreams.setSaslClient(saslRpcClient);
               // for testing
               // for testing
               remoteId.saslQop =
               remoteId.saslQop =
                   (String)saslRpcClient.getNegotiatedProperty(Sasl.QOP);
                   (String)saslRpcClient.getNegotiatedProperty(Sasl.QOP);
@@ -825,18 +826,11 @@ public class Client implements AutoCloseable {
               }
               }
             }
             }
           }
           }
-        
+
           if (doPing) {
           if (doPing) {
-            inStream = new PingInputStream(inStream);
+            ipcStreams.setInputStream(new PingInputStream(ipcStreams.in));
           }
           }
-          this.in = new DataInputStream(new BufferedInputStream(inStream));
 
 
-          // SASL may have already buffered the stream
-          if (!(outStream instanceof BufferedOutputStream)) {
-            outStream = new BufferedOutputStream(outStream);
-          }
-          this.out = new DataOutputStream(outStream);
-          
           writeConnectionContext(remoteId, authMethod);
           writeConnectionContext(remoteId, authMethod);
 
 
           // update last activity time
           // update last activity time
@@ -950,17 +944,28 @@ public class Client implements AutoCloseable {
      * |  AuthProtocol (1 byte)           |      
      * |  AuthProtocol (1 byte)           |      
      * +----------------------------------+
      * +----------------------------------+
      */
      */
-    private void writeConnectionHeader(OutputStream outStream)
+    private void writeConnectionHeader(IpcStreams streams)
         throws IOException {
         throws IOException {
-      DataOutputStream out = new DataOutputStream(new BufferedOutputStream(outStream));
-      // Write out the header, version and authentication method
-      out.write(RpcConstants.HEADER.array());
-      out.write(RpcConstants.CURRENT_VERSION);
-      out.write(serviceClass);
-      out.write(authProtocol.callId);
-      out.flush();
+      // Write out the header, version and authentication method.
+      // The output stream is buffered but we must not flush it yet.  The
+      // connection setup protocol requires the client to send multiple
+      // messages before reading a response.
+      //
+      //   insecure: send header+context+call, read
+      //   secure  : send header+negotiate, read, (sasl), context+call, read
+      //
+      // The client must flush only when it's prepared to read.  Otherwise
+      // "broken pipe" exceptions occur if the server closes the connection
+      // before all messages are sent.
+      final DataOutputStream out = streams.out;
+      synchronized (out) {
+        out.write(RpcConstants.HEADER.array());
+        out.write(RpcConstants.CURRENT_VERSION);
+        out.write(serviceClass);
+        out.write(authProtocol.callId);
+      }
     }
     }
-    
+
     /* Write the connection context header for each connection
     /* Write the connection context header for each connection
      * Out is not synchronized because only the first thread does this.
      * Out is not synchronized because only the first thread does this.
      */
      */
@@ -976,12 +981,17 @@ public class Client implements AutoCloseable {
           .makeRpcRequestHeader(RpcKind.RPC_PROTOCOL_BUFFER,
           .makeRpcRequestHeader(RpcKind.RPC_PROTOCOL_BUFFER,
               OperationProto.RPC_FINAL_PACKET, CONNECTION_CONTEXT_CALL_ID,
               OperationProto.RPC_FINAL_PACKET, CONNECTION_CONTEXT_CALL_ID,
               RpcConstants.INVALID_RETRY_COUNT, clientId);
               RpcConstants.INVALID_RETRY_COUNT, clientId);
+      // do not flush.  the context and first ipc call request must be sent
+      // together to avoid possibility of broken pipes upon authz failure.
+      // see writeConnectionHeader
       final ResponseBuffer buf = new ResponseBuffer();
       final ResponseBuffer buf = new ResponseBuffer();
       connectionContextHeader.writeDelimitedTo(buf);
       connectionContextHeader.writeDelimitedTo(buf);
       message.writeDelimitedTo(buf);
       message.writeDelimitedTo(buf);
-      buf.writeTo(out);
+      synchronized (ipcStreams.out) {
+        ipcStreams.sendRequest(buf.toByteArray());
+      }
     }
     }
-    
+
     /* wait till someone signals us to start reading RPC response or
     /* wait till someone signals us to start reading RPC response or
      * it is idle too long, it is marked as to be closed, 
      * it is idle too long, it is marked as to be closed, 
      * or the client is marked as not running.
      * or the client is marked as not running.
@@ -1024,9 +1034,9 @@ public class Client implements AutoCloseable {
       long curTime = Time.now();
       long curTime = Time.now();
       if ( curTime - lastActivity.get() >= pingInterval) {
       if ( curTime - lastActivity.get() >= pingInterval) {
         lastActivity.set(curTime);
         lastActivity.set(curTime);
-        synchronized (out) {
-          pingRequest.writeTo(out);
-          out.flush();
+        synchronized (ipcStreams.out) {
+          ipcStreams.sendRequest(pingRequest);
+          ipcStreams.flush();
         }
         }
       }
       }
     }
     }
@@ -1092,15 +1102,16 @@ public class Client implements AutoCloseable {
           @Override
           @Override
           public void run() {
           public void run() {
             try {
             try {
-              synchronized (Connection.this.out) {
+              synchronized (ipcStreams.out) {
                 if (shouldCloseConnection.get()) {
                 if (shouldCloseConnection.get()) {
                   return;
                   return;
                 }
                 }
                 if (LOG.isDebugEnabled()) {
                 if (LOG.isDebugEnabled()) {
                   LOG.debug(getName() + " sending #" + call.id);
                   LOG.debug(getName() + " sending #" + call.id);
                 }
                 }
-                buf.writeTo(out); // RpcRequestHeader + RpcRequest
-                out.flush();
+                // RpcRequestHeader + RpcRequest
+                ipcStreams.sendRequest(buf.toByteArray());
+                ipcStreams.flush();
               }
               }
             } catch (IOException e) {
             } catch (IOException e) {
               // exception at this point would leave the connection in an
               // exception at this point would leave the connection in an
@@ -1141,10 +1152,7 @@ public class Client implements AutoCloseable {
       touch();
       touch();
       
       
       try {
       try {
-        int totalLen = in.readInt();
-        ByteBuffer bb = ByteBuffer.allocate(totalLen);
-        in.readFully(bb.array());
-
+        ByteBuffer bb = ipcStreams.readResponse();
         RpcWritable.Buffer packet = RpcWritable.Buffer.wrap(bb);
         RpcWritable.Buffer packet = RpcWritable.Buffer.wrap(bb);
         RpcResponseHeaderProto header =
         RpcResponseHeaderProto header =
             packet.getValue(RpcResponseHeaderProto.getDefaultInstance());
             packet.getValue(RpcResponseHeaderProto.getDefaultInstance());
@@ -1209,8 +1217,7 @@ public class Client implements AutoCloseable {
       connections.remove(remoteId, this);
       connections.remove(remoteId, this);
 
 
       // close the streams and therefore the socket
       // close the streams and therefore the socket
-      IOUtils.closeStream(out);
-      IOUtils.closeStream(in);
+      IOUtils.closeStream(ipcStreams);
       disposeSasl();
       disposeSasl();
 
 
       // clean up all calls
       // clean up all calls
@@ -1739,4 +1746,75 @@ public class Client implements AutoCloseable {
   public void close() throws Exception {
   public void close() throws Exception {
     stop();
     stop();
   }
   }
+
+  /** Manages the input and output streams for an IPC connection.
+   *  Only exposed for use by SaslRpcClient.
+   */
+  @InterfaceAudience.Private
+  public static class IpcStreams implements Closeable, Flushable {
+    private DataInputStream in;
+    public DataOutputStream out;
+    private int maxResponseLength;
+    private boolean firstResponse = true;
+
+    IpcStreams(Socket socket, int maxResponseLength) throws IOException {
+      this.maxResponseLength = maxResponseLength;
+      setInputStream(
+          new BufferedInputStream(NetUtils.getInputStream(socket)));
+      setOutputStream(
+          new BufferedOutputStream(NetUtils.getOutputStream(socket)));
+    }
+
+    void setSaslClient(SaslRpcClient client) throws IOException {
+      setInputStream(client.getInputStream(in));
+      setOutputStream(client.getOutputStream(out));
+    }
+
+    private void setInputStream(InputStream is) {
+      this.in = (is instanceof DataInputStream)
+          ? (DataInputStream)is : new DataInputStream(is);
+    }
+
+    private void setOutputStream(OutputStream os) {
+      this.out = (os instanceof DataOutputStream)
+          ? (DataOutputStream)os : new DataOutputStream(os);
+    }
+
+    public ByteBuffer readResponse() throws IOException {
+      int length = in.readInt();
+      if (firstResponse) {
+        firstResponse = false;
+        // pre-rpcv9 exception, almost certainly a version mismatch.
+        if (length == -1) {
+          in.readInt(); // ignore fatal/error status, it's fatal for us.
+          throw new RemoteException(WritableUtils.readString(in),
+                                    WritableUtils.readString(in));
+        }
+      }
+      if (length <= 0) {
+        throw new RpcException("RPC response has invalid length");
+      }
+      if (maxResponseLength > 0 && length > maxResponseLength) {
+        throw new RpcException("RPC response exceeds maximum data length");
+      }
+      ByteBuffer bb = ByteBuffer.allocate(length);
+      in.readFully(bb.array());
+      return bb;
+    }
+
+    public void sendRequest(byte[] buf) throws IOException {
+      out.write(buf);
+    }
+
+    @Override
+    public void flush() throws IOException {
+      out.flush();
+    }
+
+    @Override
+    public void close() {
+      IOUtils.closeStream(out);
+      IOUtils.closeStream(in);
+    }
+  }
 }
 }

+ 9 - 16
hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/SaslRpcClient.java

@@ -18,11 +18,9 @@
 
 
 package org.apache.hadoop.security;
 package org.apache.hadoop.security;
 
 
-import java.io.BufferedInputStream;
 import java.io.BufferedOutputStream;
 import java.io.BufferedOutputStream;
 import java.io.ByteArrayInputStream;
 import java.io.ByteArrayInputStream;
 import java.io.DataInputStream;
 import java.io.DataInputStream;
-import java.io.DataOutputStream;
 import java.io.FilterInputStream;
 import java.io.FilterInputStream;
 import java.io.FilterOutputStream;
 import java.io.FilterOutputStream;
 import java.io.IOException;
 import java.io.IOException;
@@ -53,6 +51,7 @@ import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceStability;
 import org.apache.hadoop.classification.InterfaceStability;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.GlobPattern;
 import org.apache.hadoop.fs.GlobPattern;
+import org.apache.hadoop.ipc.Client.IpcStreams;
 import org.apache.hadoop.ipc.RPC.RpcKind;
 import org.apache.hadoop.ipc.RPC.RpcKind;
 import org.apache.hadoop.ipc.RemoteException;
 import org.apache.hadoop.ipc.RemoteException;
 import org.apache.hadoop.ipc.ResponseBuffer;
 import org.apache.hadoop.ipc.ResponseBuffer;
@@ -353,24 +352,16 @@ public class SaslRpcClient {
    * @return AuthMethod used to negotiate the connection
    * @return AuthMethod used to negotiate the connection
    * @throws IOException
    * @throws IOException
    */
    */
-  public AuthMethod saslConnect(InputStream inS, OutputStream outS)
-      throws IOException {
-    DataInputStream inStream = new DataInputStream(new BufferedInputStream(inS));
-    DataOutputStream outStream = new DataOutputStream(new BufferedOutputStream(
-        outS));
-    
+  public AuthMethod saslConnect(IpcStreams ipcStreams) throws IOException {
     // redefined if/when a SASL negotiation starts, can be queried if the
     // redefined if/when a SASL negotiation starts, can be queried if the
     // negotiation fails
     // negotiation fails
     authMethod = AuthMethod.SIMPLE;
     authMethod = AuthMethod.SIMPLE;
 
 
-    sendSaslMessage(outStream, negotiateRequest);
-
+    sendSaslMessage(ipcStreams.out, negotiateRequest);
     // loop until sasl is complete or a rpc error occurs
     // loop until sasl is complete or a rpc error occurs
     boolean done = false;
     boolean done = false;
     do {
     do {
-      int rpcLen = inStream.readInt();
-      ByteBuffer bb = ByteBuffer.allocate(rpcLen);
-      inStream.readFully(bb.array());
+      ByteBuffer bb = ipcStreams.readResponse();
 
 
       RpcWritable.Buffer saslPacket = RpcWritable.Buffer.wrap(bb);
       RpcWritable.Buffer saslPacket = RpcWritable.Buffer.wrap(bb);
       RpcResponseHeaderProto header =
       RpcResponseHeaderProto header =
@@ -447,7 +438,7 @@ public class SaslRpcClient {
         }
         }
       }
       }
       if (response != null) {
       if (response != null) {
-        sendSaslMessage(outStream, response.build());
+        sendSaslMessage(ipcStreams.out, response.build());
       }
       }
     } while (!done);
     } while (!done);
     return authMethod;
     return authMethod;
@@ -461,8 +452,10 @@ public class SaslRpcClient {
     ResponseBuffer buf = new ResponseBuffer();
     ResponseBuffer buf = new ResponseBuffer();
     saslHeader.writeDelimitedTo(buf);
     saslHeader.writeDelimitedTo(buf);
     message.writeDelimitedTo(buf);
     message.writeDelimitedTo(buf);
-    buf.writeTo(out);
-    out.flush();
+    synchronized (out) {
+      buf.writeTo(out);
+      out.flush();
+    }
   }
   }
 
 
   /**
   /**

+ 13 - 4
hadoop-common-project/hadoop-common/src/main/resources/core-default.xml

@@ -1313,10 +1313,19 @@
   <name>ipc.maximum.data.length</name>
   <name>ipc.maximum.data.length</name>
   <value>67108864</value>
   <value>67108864</value>
   <description>This indicates the maximum IPC message length (bytes) that can be
   <description>This indicates the maximum IPC message length (bytes) that can be
-    accepted by the server. Messages larger than this value are rejected by
-    server immediately. This setting should rarely need to be changed. It merits
-    investigating whether the cause of long RPC messages can be fixed instead,
-    e.g. by splitting into smaller messages.
+    accepted by the server. Messages larger than this value are rejected by the
+    immediately to avoid possible OOMs. This setting should rarely need to be
+    changed.
+  </description>
+</property>
+
+<property>
+  <name>ipc.maximum.response.length</name>
+  <value>134217728</value>
+  <description>This indicates the maximum IPC message length (bytes) that can be
+    accepted by the client. Messages larger than this value are rejected
+    immediately to avoid possible OOMs. This setting should rarely need to be
+    changed.  Set to 0 to disable.
   </description>
   </description>
 </property>
 </property>
 
 

+ 84 - 2
hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestIPC.java

@@ -40,6 +40,7 @@ import java.io.OutputStream;
 import java.lang.reflect.Method;
 import java.lang.reflect.Method;
 import java.lang.reflect.Proxy;
 import java.lang.reflect.Proxy;
 import java.net.InetSocketAddress;
 import java.net.InetSocketAddress;
+import java.net.ServerSocket;
 import java.net.Socket;
 import java.net.Socket;
 import java.net.SocketTimeoutException;
 import java.net.SocketTimeoutException;
 import java.util.ArrayList;
 import java.util.ArrayList;
@@ -49,6 +50,8 @@ import java.util.Random;
 import java.util.concurrent.BrokenBarrierException;
 import java.util.concurrent.BrokenBarrierException;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.CyclicBarrier;
 import java.util.concurrent.CyclicBarrier;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
 import java.util.concurrent.TimeoutException;
 import java.util.concurrent.TimeoutException;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicInteger;
@@ -76,6 +79,9 @@ import org.apache.hadoop.ipc.Server.Connection;
 import org.apache.hadoop.ipc.protobuf.RpcHeaderProtos.RpcResponseHeaderProto;
 import org.apache.hadoop.ipc.protobuf.RpcHeaderProtos.RpcResponseHeaderProto;
 import org.apache.hadoop.net.ConnectTimeoutException;
 import org.apache.hadoop.net.ConnectTimeoutException;
 import org.apache.hadoop.net.NetUtils;
 import org.apache.hadoop.net.NetUtils;
+import org.apache.hadoop.security.SecurityUtil;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.UserGroupInformation.AuthenticationMethod;
 import org.apache.hadoop.security.token.SecretManager.InvalidToken;
 import org.apache.hadoop.security.token.SecretManager.InvalidToken;
 import org.apache.hadoop.test.GenericTestUtils;
 import org.apache.hadoop.test.GenericTestUtils;
 import org.apache.hadoop.util.StringUtils;
 import org.apache.hadoop.util.StringUtils;
@@ -112,6 +118,8 @@ public class TestIPC {
   public void setupConf() {
   public void setupConf() {
     conf = new Configuration();
     conf = new Configuration();
     Client.setPingInterval(conf, PING_INTERVAL);
     Client.setPingInterval(conf, PING_INTERVAL);
+    // tests may enable security, so disable before each test
+    UserGroupInformation.setConfiguration(conf);
   }
   }
 
 
   static final Random RANDOM = new Random();
   static final Random RANDOM = new Random();
@@ -123,8 +131,8 @@ public class TestIPC {
 
 
   static ConnectionId getConnectionId(InetSocketAddress addr, int rpcTimeout,
   static ConnectionId getConnectionId(InetSocketAddress addr, int rpcTimeout,
       Configuration conf) throws IOException {
       Configuration conf) throws IOException {
-    return ConnectionId.getConnectionId(addr, null, null, rpcTimeout, null,
-        conf);
+    return ConnectionId.getConnectionId(addr, null,
+        UserGroupInformation.getCurrentUser(), rpcTimeout, null, conf);
   }
   }
 
 
   static Writable call(Client client, InetSocketAddress addr,
   static Writable call(Client client, InetSocketAddress addr,
@@ -1402,6 +1410,80 @@ public class TestIPC {
     client.stop();
     client.stop();
   }
   }
   
   
+  @Test(timeout=4000)
+  public void testInsecureVersionMismatch() throws IOException {
+    checkVersionMismatch();
+  }
+
+  @Test(timeout=4000)
+  public void testSecureVersionMismatch() throws IOException {
+    SecurityUtil.setAuthenticationMethod(AuthenticationMethod.KERBEROS, conf);
+    UserGroupInformation.setConfiguration(conf);
+    checkVersionMismatch();
+  }
+
+  private void checkVersionMismatch() throws IOException {
+    try (final ServerSocket listenSocket = new ServerSocket()) {
+      listenSocket.bind(null);
+      InetSocketAddress addr =
+          (InetSocketAddress) listenSocket.getLocalSocketAddress();
+
+      // open a socket that accepts a client and immediately returns
+      // a version mismatch exception.
+      ExecutorService executor = Executors.newSingleThreadExecutor();
+      executor.submit(new Runnable(){
+        @Override
+        public void run() {
+          try {
+            Socket socket = listenSocket.accept();
+            socket.getOutputStream().write(
+                NetworkTraces.RESPONSE_TO_HADOOP_0_20_3_RPC);
+            socket.close();
+          } catch (Throwable t) {
+            // ignore.
+          }
+        }
+      });
+
+      try {
+        Client client = new Client(LongWritable.class, conf);
+        call(client, 0, addr, conf);
+      } catch (RemoteException re) {
+        Assert.assertEquals(RPC.VersionMismatch.class.getName(),
+            re.getClassName());
+        Assert.assertEquals(NetworkTraces.HADOOP0_20_ERROR_MSG,
+            re.getMessage());
+        return;
+      }
+      Assert.fail("didn't get version mismatch");
+    }
+  }
+
+  @Test
+  public void testRpcResponseLimit() throws Throwable {
+    Server server = new TestServer(1, false);
+    InetSocketAddress addr = NetUtils.getConnectAddress(server);
+    server.start();
+
+    conf.setInt(CommonConfigurationKeys.IPC_MAXIMUM_RESPONSE_LENGTH, 0);
+    Client client = new Client(LongWritable.class, conf);
+    call(client, 0, addr, conf);
+
+    conf.setInt(CommonConfigurationKeys.IPC_MAXIMUM_RESPONSE_LENGTH, 4);
+    client = new Client(LongWritable.class, conf);
+    try {
+      call(client, 0, addr, conf);
+    } catch (IOException ioe) {
+      Throwable t = ioe.getCause();
+      Assert.assertNotNull(t);
+      Assert.assertEquals(RpcException.class, t.getClass());
+      Assert.assertEquals("RPC response exceeds maximum data length",
+          t.getMessage());
+      return;
+    }
+    Assert.fail("didn't get limit exceeded");
+  }
+
   private void doIpcVersionTest(
   private void doIpcVersionTest(
       byte[] requestData,
       byte[] requestData,
       byte[] expectedResponse) throws IOException {
       byte[] expectedResponse) throws IOException {