ソースを参照

HADOOP-6889. Make RPC to have an option to timeout - backport to 0.20-security. Contributed by John George and Ravi Prakash.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-0.20-security@1153219 13f79535-47bb-0310-9956-ffa450edef68
Matthew Foley 14 年 前
コミット
a2df5fa092

+ 3 - 0
CHANGES.txt

@@ -41,6 +41,9 @@ Release 0.20.205.0 - unreleased
     MAPREDUCE-2494. Make the distributed cache delete entires using LRU priority
     (Robert Joseph Evans via mahadev)
 
+    HADOOP-6889. Make RPC to have an option to timeout - backport to 
+    0.20-security. (John George and Ravi Prakash via mattf)
+
 Release 0.20.204.0 - unreleased
 
   NEW FEATURES

+ 41 - 18
src/core/org/apache/hadoop/ipc/Client.java

@@ -197,6 +197,7 @@ public class Client {
     private Socket socket = null;                 // connected socket
     private DataInputStream in;
     private DataOutputStream out;
+    private int rpcTimeout;
     private int maxIdleTime; //connections will be culled if it was idle for
          //maxIdleTime msecs
     private int maxRetries; //the max. no. of retries for socket connections
@@ -223,7 +224,7 @@ public class Client {
       if (LOG.isDebugEnabled()) {
         LOG.debug("The ping interval is" + this.pingInterval + "ms.");
       }
-      
+      this.rpcTimeout = remoteId.getRpcTimeout();
       UserGroupInformation ticket = remoteId.getTicket();
       Class<?> protocol = remoteId.getProtocol();
       this.useSasl = UserGroupInformation.isSecurityEnabled();
@@ -305,11 +306,13 @@ public class Client {
       }
 
       /* Process timeout exception
-       * if the connection is not going to be closed, send a ping.
+       * if the connection is not going to be closed or 
+       * is not configured to have a RPC timeout, send a ping.
+       * (if rpcTimeout is not set to be 0, then RPC should timeout.
        * otherwise, throw the timeout exception.
        */
       private void handleTimeout(SocketTimeoutException e) throws IOException {
-        if (shouldCloseConnection.get() || !running.get()) {
+        if (shouldCloseConnection.get() || !running.get() || rpcTimeout > 0) {
           throw e;
         } else {
           sendPing();
@@ -412,6 +415,10 @@ public class Client {
           
           // connection time out is 20s
           NetUtils.connect(this.socket, remoteId.getAddress(), 20000);
+          if (rpcTimeout > 0) {
+            pingInterval = rpcTimeout;  // rpcTimeout overwrites pingInterval
+          }
+
           this.socket.setSoTimeout(pingInterval);
           return;
         } catch (SocketTimeoutException toe) {
@@ -965,39 +972,41 @@ public class Client {
   public Writable call(Writable param, InetSocketAddress addr, 
       UserGroupInformation ticket)  
       throws InterruptedException, IOException {
-    ConnectionId remoteId = ConnectionId.getConnectionId(addr, null, ticket,
+    ConnectionId remoteId = ConnectionId.getConnectionId(addr, null, ticket, 0,
         conf);
     return call(param, remoteId);
   }
   
   /** Make a call, passing <code>param</code>, to the IPC server running at
-   * <code>address</code> which is servicing the <code>protocol</code> protocol, 
-   * with the <code>ticket</code> credentials, returning the value.  
+   * <code>address</code> which is servicing the <code>protocol</code> protocol,
+   * with the <code>ticket</code> credentials and <code>rpcTimeout</code> as
+   * timeout, returning the value.
    * Throws exceptions if there are network problems or if the remote code 
    * threw an exception. 
    * @deprecated Use {@link #call(Writable, ConnectionId)} instead 
    */
   @Deprecated
   public Writable call(Writable param, InetSocketAddress addr, 
-                       Class<?> protocol, UserGroupInformation ticket)  
+                       Class<?> protocol, UserGroupInformation ticket,
+                       int rpcTimeout)
                        throws InterruptedException, IOException {
     ConnectionId remoteId = ConnectionId.getConnectionId(addr, protocol,
-        ticket, conf);
+        ticket, rpcTimeout, conf);
     return call(param, remoteId);
   }
   
   /** Make a call, passing <code>param</code>, to the IPC server running at
    * <code>address</code> which is servicing the <code>protocol</code> protocol, 
-   * with the <code>ticket</code> credentials and <code>conf</code> as 
-   * configuration for this connection, returning the value.  
-   * Throws exceptions if there are network problems or if the remote code 
+   * with the <code>ticket</code> credentials, <code>rpcTimeout</code> as timeout 
+   * and <code>conf</code> as configuration for this connection, returning the 
+   * value. Throws exceptions if there are network problems or if the remote code 
    * threw an exception. */
-  public Writable call(Writable param, InetSocketAddress addr, 
+  public Writable call(Writable param, InetSocketAddress addr,
                        Class<?> protocol, UserGroupInformation ticket,
-                       Configuration conf)  
+                       int rpcTimeout, Configuration conf)
                        throws InterruptedException, IOException {
     ConnectionId remoteId = ConnectionId.getConnectionId(addr, protocol,
-        ticket, conf);
+        ticket, rpcTimeout, conf);
     return call(param, remoteId);
   }
   
@@ -1107,7 +1116,7 @@ public class Client {
         ParallelCall call = new ParallelCall(params[i], results, i);
         try {
           ConnectionId remoteId = ConnectionId.getConnectionId(addresses[i],
-              protocol, ticket, conf);
+              protocol, ticket, 0, conf);
           Connection connection = getConnection(remoteId, call);
           connection.sendParam(call);             // send each parameter
         } catch (IOException e) {
@@ -1175,6 +1184,7 @@ public class Client {
      UserGroupInformation ticket;
      Class<?> protocol;
      private static final int PRIME = 16777619;
+     private int rpcTimeout;
      private String serverPrincipal;
      private int maxIdleTime; //connections will be culled if it was idle for 
      //maxIdleTime msecs
@@ -1183,13 +1193,14 @@ public class Client {
      private int pingInterval; // how often sends ping to the server in msecs
      
      ConnectionId(InetSocketAddress address, Class<?> protocol, 
-                  UserGroupInformation ticket,
+                  UserGroupInformation ticket, int rpcTimeout,
                   String serverPrincipal, int maxIdleTime, 
                   int maxRetries, boolean tcpNoDelay,
                   int pingInterval) {
        this.protocol = protocol;
        this.address = address;
        this.ticket = ticket;
+       this.rpcTimeout = rpcTimeout;
        this.serverPrincipal = serverPrincipal;
        this.maxIdleTime = maxIdleTime;
        this.maxRetries = maxRetries;
@@ -1208,7 +1219,11 @@ public class Client {
      UserGroupInformation getTicket() {
        return ticket;
      }
-     
+    
+     private int getRpcTimeout() {
+       return rpcTimeout;
+     }
+ 
      String getServerPrincipal() {
        return serverPrincipal;
      }
@@ -1232,9 +1247,15 @@ public class Client {
      static ConnectionId getConnectionId(InetSocketAddress addr,
          Class<?> protocol, UserGroupInformation ticket,
          Configuration conf) throws IOException {
+       return getConnectionId(addr, protocol, ticket, 0, conf);
+     }
+
+     static ConnectionId getConnectionId(InetSocketAddress addr,
+         Class<?> protocol, UserGroupInformation ticket, int rpcTimeout,
+         Configuration conf) throws IOException {
        String remotePrincipal = getRemotePrincipal(conf, addr, protocol);
        return new ConnectionId(addr, protocol, ticket,
-           remotePrincipal,
+           rpcTimeout, remotePrincipal,
            conf.getInt("ipc.client.connection.maxidletime", 10000), // 10s
            conf.getInt("ipc.client.connect.max.retries", 10),
            conf.getBoolean("ipc.client.tcpnodelay", false),
@@ -1276,6 +1297,7 @@ public class Client {
              && this.maxRetries == that.maxRetries
              && this.pingInterval == that.pingInterval
              && isEqual(this.protocol, that.protocol)
+             && this.rpcTimeout == that.rpcTimeout
              && isEqual(this.serverPrincipal, that.serverPrincipal)
              && this.tcpNoDelay == that.tcpNoDelay
              && isEqual(this.ticket, that.ticket);
@@ -1291,6 +1313,7 @@ public class Client {
        result = PRIME * result + maxRetries;
        result = PRIME * result + pingInterval;
        result = PRIME * result + ((protocol == null) ? 0 : protocol.hashCode());
+       result = PRIME * rpcTimeout;
        result = PRIME * result
            + ((serverPrincipal == null) ? 0 : serverPrincipal.hashCode());
        result = PRIME * result + (tcpNoDelay ? 1231 : 1237);

+ 54 - 14
src/core/org/apache/hadoop/ipc/RPC.java

@@ -207,9 +207,10 @@ public class RPC {
 
     public Invoker(Class<? extends VersionedProtocol> protocol,
         InetSocketAddress address, UserGroupInformation ticket,
-        Configuration conf, SocketFactory factory) throws IOException {
+        Configuration conf, SocketFactory factory,
+        int rpcTimeout) throws IOException {
       this.remoteId = Client.ConnectionId.getConnectionId(address, protocol,
-          ticket, conf);
+          ticket, rpcTimeout, conf);
       this.client = CLIENTS.getClient(conf, factory);
     }
 
@@ -292,7 +293,7 @@ public class RPC {
       InetSocketAddress addr,
       Configuration conf
       ) throws IOException {
-    return waitForProxy(protocol, clientVersion, addr, conf, Long.MAX_VALUE);
+    return waitForProxy(protocol, clientVersion, addr, conf, 0, Long.MAX_VALUE);
   }
 
   /**
@@ -301,7 +302,7 @@ public class RPC {
    * @param clientVersion client version
    * @param addr remote address
    * @param conf configuration to use
-   * @param timeout time in milliseconds before giving up
+   * @param connTimeout time in milliseconds before giving up
    * @return the proxy
    * @throws IOException if the far end through a RemoteException
    */
@@ -310,13 +311,24 @@ public class RPC {
                                                long clientVersion,
                                                InetSocketAddress addr,
                                                Configuration conf,
-                                               long timeout
-                                               ) throws IOException { 
+                                               long connTimeout)
+                                               throws IOException { 
+    return waitForProxy(protocol, clientVersion, addr, conf, 0, connTimeout);
+  }
+
+  static VersionedProtocol waitForProxy(
+                      Class<? extends VersionedProtocol> protocol,
+                                               long clientVersion,
+                                               InetSocketAddress addr,
+                                               Configuration conf,
+                                               int rpcTimeout,
+                                               long connTimeout)
+                                               throws IOException { 
     long startTime = System.currentTimeMillis();
     IOException ioe;
     while (true) {
       try {
-        return getProxy(protocol, clientVersion, addr, conf);
+        return getProxy(protocol, clientVersion, addr, conf, rpcTimeout);
       } catch(ConnectException se) {  // namenode has not been started
         LOG.info("Server at " + addr + " not available yet, Zzzzz...");
         ioe = se;
@@ -325,7 +337,7 @@ public class RPC {
         ioe = te;
       }
       // check if timed out
-      if (System.currentTimeMillis()-timeout >= startTime) {
+      if (System.currentTimeMillis()-connTimeout >= startTime) {
         throw ioe;
       }
 
@@ -337,6 +349,7 @@ public class RPC {
       }
     }
   }
+
   /** Construct a client-side proxy object that implements the named protocol,
    * talking to a server at the named address. */
   public static VersionedProtocol getProxy(
@@ -344,15 +357,34 @@ public class RPC {
       long clientVersion, InetSocketAddress addr, Configuration conf,
       SocketFactory factory) throws IOException {
     UserGroupInformation ugi = UserGroupInformation.getCurrentUser();
-    return getProxy(protocol, clientVersion, addr, ugi, conf, factory);
+    return getProxy(protocol, clientVersion, addr, ugi, conf, factory, 0);
   }
-  
+ 
+  /** Construct a client-side proxy object that implements the named protocol,
+   * talking to a server at the named address. */
+  public static VersionedProtocol getProxy(
+      Class<? extends VersionedProtocol> protocol,
+      long clientVersion, InetSocketAddress addr, Configuration conf,
+      SocketFactory factory, int rpcTimeout) throws IOException {
+    UserGroupInformation ugi = UserGroupInformation.getCurrentUser();
+    return getProxy(protocol, clientVersion, addr, ugi, conf, factory, rpcTimeout);
+  }
+    
+  /** Construct a client-side proxy object that implements the named protocol,
+   * talking to a server at the named address. */
+  public static VersionedProtocol getProxy(
+      Class<? extends VersionedProtocol> protocol,
+      long clientVersion, InetSocketAddress addr, UserGroupInformation ticket,
+      Configuration conf, SocketFactory factory) throws IOException {
+    return getProxy(protocol, clientVersion, addr, ticket, conf, factory, 0);
+  }
+
   /** Construct a client-side proxy object that implements the named protocol,
    * talking to a server at the named address. */
   public static VersionedProtocol getProxy(
       Class<? extends VersionedProtocol> protocol,
       long clientVersion, InetSocketAddress addr, UserGroupInformation ticket,
-      Configuration conf, SocketFactory factory) throws IOException {    
+      Configuration conf, SocketFactory factory, int rpcTimeout) throws IOException {
 
     if (UserGroupInformation.isSecurityEnabled()) {
       SaslRpcServer.init(conf);
@@ -360,7 +392,7 @@ public class RPC {
     VersionedProtocol proxy =
         (VersionedProtocol) Proxy.newProxyInstance(
             protocol.getClassLoader(), new Class[] { protocol },
-            new Invoker(protocol, addr, ticket, conf, factory));
+            new Invoker(protocol, addr, ticket, conf, factory, rpcTimeout));
     long serverVersion = proxy.getProtocolVersion(protocol.getName(), 
                                                   clientVersion);
     if (serverVersion == clientVersion) {
@@ -385,9 +417,17 @@ public class RPC {
       Class<? extends VersionedProtocol> protocol,
       long clientVersion, InetSocketAddress addr, Configuration conf)
       throws IOException {
+    return getProxy(protocol, clientVersion, addr, conf,
+        NetUtils.getDefaultSocketFactory(conf), 0);
+  }
+
+  public static VersionedProtocol getProxy(
+      Class<? extends VersionedProtocol> protocol,
+      long clientVersion, InetSocketAddress addr, Configuration conf, int rpcTimeout)
+      throws IOException {
 
-    return getProxy(protocol, clientVersion, addr, conf, 
-        NetUtils.getDefaultSocketFactory(conf));
+    return getProxy(protocol, clientVersion, addr, conf,
+        NetUtils.getDefaultSocketFactory(conf), rpcTimeout);
   }
 
   /**

+ 3 - 3
src/hdfs/org/apache/hadoop/hdfs/DFSClient.java

@@ -134,7 +134,7 @@ public class DFSClient implements FSConstants, java.io.Closeable {
 
   static ClientDatanodeProtocol createClientDatanodeProtocolProxy (
       DatanodeID datanodeid, Configuration conf, 
-      Block block, Token<BlockTokenIdentifier> token) throws IOException {
+      Block block, Token<BlockTokenIdentifier> token, int socketTimeout) throws IOException {
     InetSocketAddress addr = NetUtils.createSocketAddr(
       datanodeid.getHost() + ":" + datanodeid.getIpcPort());
     if (ClientDatanodeProtocol.LOG.isDebugEnabled()) {
@@ -145,7 +145,7 @@ public class DFSClient implements FSConstants, java.io.Closeable {
     ticket.addToken(token);
     return (ClientDatanodeProtocol)RPC.getProxy(ClientDatanodeProtocol.class,
         ClientDatanodeProtocol.versionID, addr, ticket, conf, NetUtils
-        .getDefaultSocketFactory(conf));
+        .getDefaultSocketFactory(conf), socketTimeout);
   }
         
   /**
@@ -2741,7 +2741,7 @@ public class DFSClient implements FSConstants, java.io.Closeable {
         try {
           // Pick the "least" datanode as the primary datanode to avoid deadlock.
           primaryNode = Collections.min(Arrays.asList(newnodes));
-          primary = createClientDatanodeProtocolProxy(primaryNode, conf, block, accessToken);
+          primary = createClientDatanodeProtocolProxy(primaryNode, conf, block, accessToken, socketTimeout);
           newBlock = primary.recoverBlock(block, isAppend, newnodes);
         } catch (IOException e) {
           recoveryErrorCount++;

+ 3 - 3
src/hdfs/org/apache/hadoop/hdfs/server/datanode/DataNode.java

@@ -537,7 +537,7 @@ public class DataNode extends Configured
   } 
 
   public static InterDatanodeProtocol createInterDataNodeProtocolProxy(
-      DatanodeID datanodeid, final Configuration conf) throws IOException {
+      DatanodeID datanodeid, final Configuration conf, final int socketTimeout) throws IOException {
     final InetSocketAddress addr = NetUtils.createSocketAddr(
         datanodeid.getHost() + ":" + datanodeid.getIpcPort());
     if (InterDatanodeProtocol.LOG.isDebugEnabled()) {
@@ -551,7 +551,7 @@ public class DataNode extends Configured
             public InterDatanodeProtocol run() throws IOException {
               return (InterDatanodeProtocol) RPC.getProxy(
                   InterDatanodeProtocol.class, InterDatanodeProtocol.versionID,
-                  addr, conf);
+                  addr, conf, socketTimeout);
             }
           });
     } catch (InterruptedException ie) {
@@ -1712,7 +1712,7 @@ public class DataNode extends Configured
       for(DatanodeID id : datanodeids) {
         try {
           InterDatanodeProtocol datanode = dnRegistration.equals(id)?
-              this: DataNode.createInterDataNodeProtocolProxy(id, getConf());
+              this: DataNode.createInterDataNodeProtocolProxy(id, getConf(), socketTimeout);
           BlockMetaDataInfo info = datanode.getBlockMetaDataInfo(block);
           if (info != null && info.getGenerationStamp() >= block.getGenerationStamp()) {
             if (keepLength) {

+ 40 - 0
src/test/org/apache/hadoop/hdfs/TestDFSClientRetries.java

@@ -32,6 +32,7 @@ import org.apache.hadoop.hdfs.DFSClient.DFSInputStream;
 import org.apache.hadoop.hdfs.protocol.*;
 import org.apache.hadoop.hdfs.protocol.FSConstants.UpgradeAction;
 import org.apache.hadoop.hdfs.server.common.*;
+import org.apache.hadoop.hdfs.server.datanode.TestInterDatanodeProtocol;
 import org.apache.hadoop.hdfs.server.namenode.NotReplicatedYetException;
 import org.apache.hadoop.hdfs.server.namenode.NameNode;
 import org.apache.hadoop.io.*;
@@ -381,4 +382,43 @@ public class TestDFSClientRetries extends TestCase {
       return new LocatedBlocks(goodBlockList.getFileLength(), badBlocks, false);
     }
   }
+  
+  
+  /**
+   * The following test first creates a file.
+   * It verifies the block information from a datanode.
+   * Then, it stops the DN and observes timeout on connection attempt. 
+   */
+  public void testDFSClientTimeout() throws Exception {
+    Configuration conf = new Configuration();
+    MiniDFSCluster cluster = null;
+
+    try {
+      cluster = new MiniDFSCluster(conf, 3, true, null);
+      cluster.waitActive();
+
+      //create a file
+      DistributedFileSystem dfs = (DistributedFileSystem)cluster.getFileSystem();
+      String filestr = "/foo";
+      Path filepath = new Path(filestr);
+      DFSTestUtil.createFile(dfs, filepath, 1024L, (short)3, 0L);
+      assertTrue(dfs.getClient().exists(filestr));
+
+      //get block info
+      LocatedBlock locatedblock = TestInterDatanodeProtocol.getLastLocatedBlock(dfs.getClient().namenode, filestr);
+      DatanodeInfo[] datanodeinfo = locatedblock.getLocations();
+      assertTrue(datanodeinfo.length > 0);
+
+      //shutdown a data node
+      cluster.stopDataNode(datanodeinfo[0].getName());
+      DFSClient.createClientDatanodeProtocolProxy(datanodeinfo[0], conf, 
+        locatedblock.getBlock(), locatedblock.getBlockToken(), 500);
+      fail("Expected an exception to have been thrown"); 
+    } catch (IOException e) {
+      DFSClient.LOG.info("Got a SocketTimeoutException ", e);
+    } finally {
+      if (cluster != null) {cluster.shutdown();}
+    }
+  }
+  
 }

+ 4 - 6
src/test/org/apache/hadoop/hdfs/TestLeaseRecovery.java

@@ -80,10 +80,8 @@ public class TestLeaseRecovery extends junit.framework.TestCase {
       assertEquals(REPLICATION_NUM, datanodeinfos.length);
 
       //connect to data nodes
-      InterDatanodeProtocol[] idps = new InterDatanodeProtocol[REPLICATION_NUM];
       DataNode[] datanodes = new DataNode[REPLICATION_NUM];
       for(int i = 0; i < REPLICATION_NUM; i++) {
-        idps[i] = DataNode.createInterDataNodeProtocolProxy(datanodeinfos[i], conf);
         datanodes[i] = cluster.getDataNode(datanodeinfos[i].getIpcPort());
         assertTrue(datanodes[i] != null);
       }
@@ -92,7 +90,7 @@ public class TestLeaseRecovery extends junit.framework.TestCase {
       Block lastblock = locatedblock.getBlock();
       DataNode.LOG.info("newblocks=" + lastblock);
       for(int i = 0; i < REPLICATION_NUM; i++) {
-        checkMetaInfo(lastblock, idps[i]);
+        checkMetaInfo(lastblock, datanodes[i]);
       }
 
       //setup random block sizes 
@@ -108,8 +106,8 @@ public class TestLeaseRecovery extends junit.framework.TestCase {
       for(int i = 0; i < REPLICATION_NUM; i++) {
         newblocks[i] = new Block(lastblock.getBlockId(), newblocksizes[i],
             lastblock.getGenerationStamp());
-        idps[i].updateBlock(lastblock, newblocks[i], false);
-        checkMetaInfo(newblocks[i], idps[i]);
+        datanodes[i].updateBlock(lastblock, newblocks[i], false);
+        checkMetaInfo(newblocks[i], datanodes[i]);
       }
 
       DataNode.LOG.info("dfs.dfs.clientName=" + dfs.dfs.clientName);
@@ -127,7 +125,7 @@ public class TestLeaseRecovery extends junit.framework.TestCase {
       long currentGS = cluster.getNameNode().namesystem.getGenerationStamp();
       lastblock.setGenerationStamp(currentGS);
       for(int i = 0; i < REPLICATION_NUM; i++) {
-        updatedmetainfo[i] = idps[i].getBlockMetaDataInfo(lastblock);
+        updatedmetainfo[i] = datanodes[i].getBlockMetaDataInfo(lastblock);
         assertEquals(lastblock.getBlockId(), updatedmetainfo[i].getBlockId());
         assertEquals(minsize, updatedmetainfo[i].getNumBytes());
         assertEquals(currentGS, updatedmetainfo[i].getGenerationStamp());

+ 40 - 2
src/test/org/apache/hadoop/hdfs/server/datanode/TestInterDatanodeProtocol.java

@@ -88,10 +88,10 @@ public class TestInterDatanodeProtocol extends junit.framework.TestCase {
       assertTrue(datanodeinfo.length > 0);
 
       //connect to a data node
-      InterDatanodeProtocol idp = DataNode.createInterDataNodeProtocolProxy(
-          datanodeinfo[0], conf);
       DataNode datanode = cluster.getDataNode(datanodeinfo[0].getIpcPort());
       assertTrue(datanode != null);
+      InterDatanodeProtocol idp = DataNode.createInterDataNodeProtocolProxy(
+          datanodeinfo[0], conf, datanode.socketTimeout);
       
       //stop block scanner, so we could compare lastScanTime
       datanode.blockScannerThread.interrupt();
@@ -111,4 +111,42 @@ public class TestInterDatanodeProtocol extends junit.framework.TestCase {
       if (cluster != null) {cluster.shutdown();}
     }
   }
+
+  /**
+   * The following test first creates a file.
+   * It verifies the block information from a datanode.
+   * Then, it stops the DN and observes timeout on connection attempt. 
+   */
+  public void testInterDNProtocolTimeout() throws Exception {
+    Configuration conf = new Configuration();
+    MiniDFSCluster cluster = null;
+
+    try {
+      cluster = new MiniDFSCluster(conf, 3, true, null);
+      cluster.waitActive();
+
+      //create a file
+      DistributedFileSystem dfs = (DistributedFileSystem)cluster.getFileSystem();
+      String filestr = "/foo";
+      Path filepath = new Path(filestr);
+      DFSTestUtil.createFile(dfs, filepath, 1024L, (short)3, 0L);
+      assertTrue(dfs.getClient().exists(filestr));
+
+      //get block info
+      LocatedBlock locatedblock = getLastLocatedBlock(dfs.getClient().namenode, filestr);
+      DatanodeInfo[] datanodeinfo = locatedblock.getLocations();
+      assertTrue(datanodeinfo.length > 0);
+
+      //shutdown a data node
+      cluster.stopDataNode(datanodeinfo[0].getName());
+      InterDatanodeProtocol idp = DataNode.createInterDataNodeProtocolProxy(
+          datanodeinfo[0], conf, 500);
+      fail("Expected an exception to have been thrown"); 
+    } catch (IOException e) {
+      InterDatanodeProtocol.LOG.info("Got a SocketTimeoutException ", e);
+    } finally {
+      if (cluster != null) {cluster.shutdown();}
+    }
+  }
+  
 }

+ 30 - 6
src/test/org/apache/hadoop/ipc/TestIPC.java

@@ -29,6 +29,7 @@ import java.util.Random;
 import java.io.DataInput;
 import java.io.IOException;
 import java.net.InetSocketAddress;
+import java.net.SocketTimeoutException;
 
 import junit.framework.TestCase;
 
@@ -41,7 +42,8 @@ public class TestIPC extends TestCase {
   
   final private static Configuration conf = new Configuration();
   final static private int PING_INTERVAL = 1000;
-  
+  final static private int MIN_SLEEP_TIME = 1000;
+ 
   static {
     Client.setPingInterval(conf, PING_INTERVAL);
   }
@@ -64,8 +66,9 @@ public class TestIPC extends TestCase {
     public Writable call(Class<?> protocol, Writable param, long receiveTime)
         throws IOException {
       if (sleep) {
+        // sleep a bit
         try {
-          Thread.sleep(RANDOM.nextInt(2*PING_INTERVAL));      // sleep a bit
+          Thread.sleep(RANDOM.nextInt(PING_INTERVAL) + MIN_SLEEP_TIME);
         } catch (InterruptedException e) {}
       }
       return param;                               // echo param as result
@@ -89,7 +92,7 @@ public class TestIPC extends TestCase {
         try {
           LongWritable param = new LongWritable(RANDOM.nextLong());
           LongWritable value =
-            (LongWritable)client.call(param, server, null, null, conf);
+            (LongWritable)client.call(param, server, null, null, 0, conf);
           if (!param.equals(value)) {
             LOG.fatal("Call failed!");
             failed = true;
@@ -139,7 +142,7 @@ public class TestIPC extends TestCase {
   }
 
   public void testSerial() throws Exception {
-    testSerial(3, false, 2, 5, 100);
+    testSerial(3, false, 2, 5, 10);
   }
 
   public void testSerial(int handlerCount, boolean handlerSleep, 
@@ -217,7 +220,7 @@ public class TestIPC extends TestCase {
     InetSocketAddress address = new InetSocketAddress("127.0.0.1", 10);
     try {
       client.call(new LongWritable(RANDOM.nextLong()),
-              address, null, null, conf);
+              address, null, null, 0, conf);
       fail("Expected an exception to have been thrown");
     } catch (IOException e) {
       String message = e.getMessage();
@@ -232,6 +235,27 @@ public class TestIPC extends TestCase {
     }
   }
 
+  public void testIpcTimeout() throws Exception {
+    // start server
+    Server server = new TestServer(1, true);
+    InetSocketAddress addr = NetUtils.getConnectAddress(server);
+    server.start();
+
+    // start client
+    Client client = new Client(LongWritable.class, conf);
+    // set timeout to be less than MIN_SLEEP_TIME
+    try {
+      client.call(new LongWritable(RANDOM.nextLong()),
+              addr, null, null, MIN_SLEEP_TIME/2);
+      fail("Expected an exception to have been thrown");
+    } catch (SocketTimeoutException e) {
+     LOG.info("Get a SocketTimeoutException ", e);
+    }
+    // set timeout to be bigger than 3*ping interval
+    client.call(new LongWritable(RANDOM.nextLong()),
+        addr, null, null, 3*PING_INTERVAL+MIN_SLEEP_TIME);
+  }
+
   private static class LongErrorWritable extends LongWritable {
     private final static String ERR_MSG =
       "Come across an exception while reading";
@@ -258,7 +282,7 @@ public class TestIPC extends TestCase {
     Client client = new Client(LongErrorWritable.class, conf);
     try {
       client.call(new LongErrorWritable(RANDOM.nextLong()),
-          addr, null, null, conf);
+          addr, null, null, 0, conf);
       fail("Expected an exception to have been thrown");
     } catch (IOException e) {
       // check error