Browse Source

HDFS-1330 and HADOOP-6889. Added additional unit tests. Contributed by John George.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-0.23@1163464 13f79535-47bb-0310-9956-ffa450edef68
Matthew Foley 13 years ago
parent
commit
0fdcf88498

+ 1 - 0
hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt

@@ -1084,6 +1084,7 @@ Release 0.22.0 - Unreleased
     (jghoman)
 
     HDFS-1330. Make RPCs to DataNodes timeout. (hairong)
+    Added additional unit tests per HADOOP-6889. (John George via mattf)
 
     HDFS-202.  HDFS support of listLocatedStatus introduced in HADOOP-6870.
     HDFS piggyback block locations to each file status when listing a

+ 89 - 7
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSClientRetries.java

@@ -25,7 +25,12 @@ import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.spy;
 import static org.mockito.Mockito.when;
 
+import java.net.SocketTimeoutException;
+import org.apache.hadoop.io.IOUtils;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.LongWritable;
 import java.io.IOException;
+import java.net.InetSocketAddress;
 import java.io.InputStream;
 import java.io.OutputStream;
 import java.security.MessageDigest;
@@ -44,6 +49,8 @@ import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hdfs.DFSConfigKeys;
 import org.apache.hadoop.hdfs.protocol.DatanodeID;
+import org.apache.hadoop.hdfs.protocol.Block;
+import org.apache.hadoop.hdfs.protocol.ClientDatanodeProtocol;
 import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
 import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
 import org.apache.hadoop.hdfs.protocol.LocatedBlock;
@@ -52,6 +59,11 @@ import org.apache.hadoop.hdfs.server.namenode.NameNode;
 import org.apache.hadoop.hdfs.server.namenode.NotReplicatedYetException;
 import org.apache.hadoop.io.IOUtils;
 import org.apache.hadoop.ipc.RemoteException;
+import org.apache.hadoop.ipc.Client;
+import org.apache.hadoop.ipc.ProtocolSignature;
+import org.apache.hadoop.ipc.RPC;
+import org.apache.hadoop.ipc.Server;
+import org.apache.hadoop.net.NetUtils;
 import org.mockito.internal.stubbing.answers.ThrowsException;
 import org.mockito.invocation.InvocationOnMock;
 import org.mockito.stubbing.Answer;
@@ -61,9 +73,51 @@ import org.mockito.stubbing.Answer;
  * properly in case of errors.
  */
 public class TestDFSClientRetries extends TestCase {
+  private static final String ADDRESS = "0.0.0.0";
+  final static private int PING_INTERVAL = 1000;
+  final static private int MIN_SLEEP_TIME = 1000;
   public static final Log LOG =
     LogFactory.getLog(TestDFSClientRetries.class.getName());
-  
+  final static private Configuration conf = new HdfsConfiguration();
+ 
+ private static class TestServer extends Server {
+    private boolean sleep;
+    private Class<? extends Writable> responseClass;
+
+    public TestServer(int handlerCount, boolean sleep) throws IOException {
+      this(handlerCount, sleep, LongWritable.class, null);
+    }
+
+    public TestServer(int handlerCount, boolean sleep,
+        Class<? extends Writable> paramClass,
+        Class<? extends Writable> responseClass)
+      throws IOException {
+      super(ADDRESS, 0, paramClass, handlerCount, conf);
+      this.sleep = sleep;
+      this.responseClass = responseClass;
+    }
+
+    @Override
+    public Writable call(Class<?> protocol, Writable param, long receiveTime)
+        throws IOException {
+      if (sleep) {
+        // sleep a bit
+        try {
+          Thread.sleep(PING_INTERVAL + MIN_SLEEP_TIME);
+        } catch (InterruptedException e) {}
+      }
+      if (responseClass != null) {
+        try {
+          return responseClass.newInstance();
+        } catch (Exception e) {
+          throw new RuntimeException(e);
+        }
+      } else {
+        return param;                               // echo param as result
+      }
+    }
+  }
+ 
   // writes 'len' bytes of data to out.
   private static void writeData(OutputStream out, int len) throws IOException {
     byte [] buf = new byte[4096*16];
@@ -80,8 +134,6 @@ public class TestDFSClientRetries extends TestCase {
    */
   public void testWriteTimeoutAtDataNode() throws IOException,
                                                   InterruptedException { 
-    Configuration conf = new HdfsConfiguration();
-    
     final int writeTimeout = 100; //milliseconds.
     // set a very short write timeout for datanode, so that tests runs fast.
     conf.setInt(DFSConfigKeys.DFS_DATANODE_SOCKET_WRITE_TIMEOUT_KEY, writeTimeout); 
@@ -136,7 +188,6 @@ public class TestDFSClientRetries extends TestCase {
   { 
     final String exceptionMsg = "Nope, not replicated yet...";
     final int maxRetries = 1; // Allow one retry (total of two calls)
-    Configuration conf = new HdfsConfiguration();
     conf.setInt(DFSConfigKeys.DFS_CLIENT_BLOCK_WRITE_LOCATEFOLLOWINGBLOCK_RETRIES_KEY, maxRetries);
     
     NameNode mockNN = mock(NameNode.class);
@@ -182,7 +233,6 @@ public class TestDFSClientRetries extends TestCase {
     long fileSize = 4096;
     Path file = new Path("/testFile");
 
-    Configuration conf = new Configuration();
     // Set short retry timeout so this test runs faster
     conf.setInt(DFSConfigKeys.DFS_CLIENT_RETRY_WINDOW_BASE, 10);
     MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).build();
@@ -379,7 +429,6 @@ public class TestDFSClientRetries extends TestCase {
     long blockSize = 128*1024*1024; // DFS block size
     int bufferSize = 4096;
     
-    Configuration conf = new HdfsConfiguration();
     conf.setInt(DFSConfigKeys.DFS_DATANODE_MAX_RECEIVER_THREADS_KEY, xcievers);
     conf.setInt(DFSConfigKeys.DFS_CLIENT_MAX_BLOCK_ACQUIRE_FAILURES_KEY, 
                 retries);
@@ -540,7 +589,6 @@ public class TestDFSClientRetries extends TestCase {
     final String f = "/testGetFileChecksum";
     final Path p = new Path(f);
 
-    final Configuration conf = new Configuration();
     final MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).numDataNodes(3).build();
     try {
       cluster.waitActive();
@@ -566,5 +614,39 @@ public class TestDFSClientRetries extends TestCase {
       cluster.shutdown();
     }
   }
+
+  /** Test that timeout occurs when DN does not respond to RPC.
+   * Start up a server and ask it to sleep for n seconds. Make an
+   * RPC to the server and set rpcTimeout to less than n and ensure
+   * that socketTimeoutException is obtained
+   */
+  public void testClientDNProtocolTimeout() throws IOException {
+    final Server server = new TestServer(1, true);
+    server.start();
+
+    final InetSocketAddress addr = NetUtils.getConnectAddress(server);
+    DatanodeID fakeDnId = new DatanodeID(
+        "localhost:" + addr.getPort(), "fake-storage", 0, addr.getPort());
+    
+    ExtendedBlock b = new ExtendedBlock("fake-pool", new Block(12345L));
+    LocatedBlock fakeBlock = new LocatedBlock(b, new DatanodeInfo[0]);
+
+    ClientDatanodeProtocol proxy = null;
+
+    try {
+      proxy = DFSUtil.createClientDatanodeProtocolProxy(
+          fakeDnId, conf, 500, fakeBlock);
+
+      proxy.getReplicaVisibleLength(null);
+      fail ("Did not get expected exception: SocketTimeoutException");
+    } catch (SocketTimeoutException e) {
+      LOG.info("Got the expected Exception: SocketTimeoutException");
+    } finally {
+      if (proxy != null) {
+        RPC.stopProxy(proxy);
+      }
+      server.stop();
+    }
+  }
 }
 

+ 88 - 2
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestInterDatanodeProtocol.java

@@ -22,6 +22,20 @@ import static org.junit.Assert.fail;
 
 import java.io.IOException;
 import java.util.List;
+import java.net.InetSocketAddress;
+
+import java.net.SocketTimeoutException;
+import org.apache.hadoop.io.IOUtils;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.util.StringUtils;
+
+import org.apache.hadoop.ipc.Client;
+import org.apache.hadoop.ipc.ProtocolSignature;
+import org.apache.hadoop.ipc.RPC;
+import org.apache.hadoop.ipc.Server;
+import org.apache.hadoop.net.NetUtils;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
@@ -38,6 +52,7 @@ import org.apache.hadoop.hdfs.protocol.LocatedBlock;
 import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
 import org.apache.hadoop.hdfs.protocol.RecoveryInProgressException;
 import org.apache.hadoop.hdfs.server.common.HdfsConstants.ReplicaState;
+import org.apache.hadoop.hdfs.protocol.DatanodeID;
 import org.apache.hadoop.hdfs.server.protocol.InterDatanodeProtocol;
 import org.apache.hadoop.hdfs.server.protocol.ReplicaRecoveryInfo;
 import org.apache.hadoop.hdfs.server.protocol.BlockRecoveryCommand.RecoveringBlock;
@@ -48,6 +63,50 @@ import org.junit.Test;
  * This tests InterDataNodeProtocol for block handling. 
  */
 public class TestInterDatanodeProtocol {
+  private static final String ADDRESS = "0.0.0.0";
+  final static private int PING_INTERVAL = 1000;
+  final static private int MIN_SLEEP_TIME = 1000;
+  private static Configuration conf = new HdfsConfiguration();
+
+
+  private static class TestServer extends Server {
+    private boolean sleep;
+    private Class<? extends Writable> responseClass;
+
+    public TestServer(int handlerCount, boolean sleep) throws IOException {
+      this(handlerCount, sleep, LongWritable.class, null);
+    }
+
+    public TestServer(int handlerCount, boolean sleep,
+        Class<? extends Writable> paramClass,
+        Class<? extends Writable> responseClass)
+      throws IOException {
+      super(ADDRESS, 0, paramClass, handlerCount, conf);
+      this.sleep = sleep;
+      this.responseClass = responseClass;
+    }
+
+    @Override
+    public Writable call(Class<?> protocol, Writable param, long receiveTime)
+        throws IOException {
+      if (sleep) {
+        // sleep a bit
+        try {
+          Thread.sleep(PING_INTERVAL + MIN_SLEEP_TIME);
+        } catch (InterruptedException e) {}
+      }
+      if (responseClass != null) {
+        try {
+          return responseClass.newInstance();
+        } catch (Exception e) {
+          throw new RuntimeException(e);
+        }
+      } else {
+        return param;                               // echo param as result
+      }
+    }
+  }
+
   public static void checkMetaInfo(ExtendedBlock b, DataNode dn) throws IOException {
     Block metainfo = dn.data.getStoredBlock(b.getBlockPoolId(), b.getBlockId());
     Assert.assertEquals(b.getBlockId(), metainfo.getBlockId());
@@ -73,7 +132,6 @@ public class TestInterDatanodeProtocol {
    */
   @Test
   public void testBlockMetaDataInfo() throws Exception {
-    Configuration conf = new HdfsConfiguration();
     MiniDFSCluster cluster = null;
 
     try {
@@ -222,7 +280,6 @@ public class TestInterDatanodeProtocol {
    * */
   @Test
   public void testUpdateReplicaUnderRecovery() throws IOException {
-    final Configuration conf = new HdfsConfiguration();
     MiniDFSCluster cluster = null;
 
     try {
@@ -291,4 +348,33 @@ public class TestInterDatanodeProtocol {
       if (cluster != null) cluster.shutdown();
     }
   }
+
+  /** Test to verify that InterDatanode RPC timesout as expected when
+   *  the server DN does not respond.
+   */
+  @Test
+  public void testInterDNProtocolTimeout() throws Exception {
+    final Server server = new TestServer(1, true);
+    server.start();
+
+    final InetSocketAddress addr = NetUtils.getConnectAddress(server);
+    DatanodeID fakeDnId = new DatanodeID(
+        "localhost:" + addr.getPort(), "fake-storage", 0, addr.getPort());
+    DatanodeInfo dInfo = new DatanodeInfo(fakeDnId);
+    InterDatanodeProtocol proxy = null;
+
+    try {
+      proxy = DataNode.createInterDataNodeProtocolProxy(
+          dInfo, conf, 500);
+      proxy.initReplicaRecovery(null);
+      fail ("Expected SocketTimeoutException exception, but did not get.");
+    } catch (SocketTimeoutException e) {
+      DataNode.LOG.info("Got expected Exception: SocketTimeoutException" + e);
+    } finally {
+      if (proxy != null) {
+        RPC.stopProxy(proxy);
+      }
+      server.stop();
+    }
+  }
 }