|
@@ -17,11 +17,17 @@
|
|
*/
|
|
*/
|
|
package org.apache.hadoop.hdfs;
|
|
package org.apache.hadoop.hdfs;
|
|
|
|
|
|
|
|
+import java.net.SocketTimeoutException;
|
|
|
|
+import org.apache.hadoop.io.IOUtils;
|
|
|
|
+import org.apache.hadoop.io.Writable;
|
|
|
|
+import org.apache.hadoop.io.LongWritable;
|
|
import java.io.IOException;
|
|
import java.io.IOException;
|
|
|
|
+import java.net.InetSocketAddress;
|
|
import java.io.InputStream;
|
|
import java.io.InputStream;
|
|
import java.io.OutputStream;
|
|
import java.io.OutputStream;
|
|
import java.util.ArrayList;
|
|
import java.util.ArrayList;
|
|
import java.util.List;
|
|
import java.util.List;
|
|
|
|
+import java.net.SocketTimeoutException;
|
|
|
|
|
|
import org.apache.commons.logging.Log;
|
|
import org.apache.commons.logging.Log;
|
|
import org.apache.commons.logging.LogFactory;
|
|
import org.apache.commons.logging.LogFactory;
|
|
@@ -31,6 +37,9 @@ import org.apache.hadoop.fs.permission.FsPermission;
|
|
import org.apache.hadoop.hdfs.DFSClient.DFSInputStream;
|
|
import org.apache.hadoop.hdfs.DFSClient.DFSInputStream;
|
|
import org.apache.hadoop.hdfs.protocol.*;
|
|
import org.apache.hadoop.hdfs.protocol.*;
|
|
import org.apache.hadoop.hdfs.protocol.FSConstants.UpgradeAction;
|
|
import org.apache.hadoop.hdfs.protocol.FSConstants.UpgradeAction;
|
|
|
|
+import org.apache.hadoop.hdfs.protocol.Block;
|
|
|
|
+import org.apache.hadoop.hdfs.protocol.ClientDatanodeProtocol;
|
|
|
|
+
|
|
import org.apache.hadoop.hdfs.server.common.*;
|
|
import org.apache.hadoop.hdfs.server.common.*;
|
|
import org.apache.hadoop.hdfs.server.datanode.TestInterDatanodeProtocol;
|
|
import org.apache.hadoop.hdfs.server.datanode.TestInterDatanodeProtocol;
|
|
import org.apache.hadoop.hdfs.server.namenode.NotReplicatedYetException;
|
|
import org.apache.hadoop.hdfs.server.namenode.NotReplicatedYetException;
|
|
@@ -41,6 +50,11 @@ import org.apache.hadoop.security.AccessControlException;
|
|
import org.apache.hadoop.security.UserGroupInformation;
|
|
import org.apache.hadoop.security.UserGroupInformation;
|
|
import org.apache.hadoop.security.token.Token;
|
|
import org.apache.hadoop.security.token.Token;
|
|
import org.apache.hadoop.security.token.SecretManager.InvalidToken;
|
|
import org.apache.hadoop.security.token.SecretManager.InvalidToken;
|
|
|
|
+import org.apache.hadoop.ipc.Client;
|
|
|
|
+import org.apache.hadoop.ipc.RPC;
|
|
|
|
+import org.apache.hadoop.ipc.Server;
|
|
|
|
+import org.apache.hadoop.net.NetUtils;
|
|
|
|
+
|
|
import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier;
|
|
import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier;
|
|
|
|
|
|
import junit.framework.TestCase;
|
|
import junit.framework.TestCase;
|
|
@@ -53,9 +67,51 @@ import org.mockito.invocation.InvocationOnMock;
|
|
* properly in case of errors.
|
|
* properly in case of errors.
|
|
*/
|
|
*/
|
|
public class TestDFSClientRetries extends TestCase {
|
|
public class TestDFSClientRetries extends TestCase {
|
|
|
|
+ private static final String ADDRESS = "0.0.0.0";
|
|
|
|
+ final static private int PING_INTERVAL = 1000;
|
|
|
|
+ final static private int MIN_SLEEP_TIME = 1000;
|
|
public static final Log LOG =
|
|
public static final Log LOG =
|
|
LogFactory.getLog(TestDFSClientRetries.class.getName());
|
|
LogFactory.getLog(TestDFSClientRetries.class.getName());
|
|
-
|
|
|
|
|
|
+ final static private Configuration conf = new Configuration();
|
|
|
|
+
|
|
|
|
+ private static class TestServer extends Server {
|
|
|
|
+ private boolean sleep;
|
|
|
|
+ private Class<? extends Writable> responseClass;
|
|
|
|
+
|
|
|
|
+ public TestServer(int handlerCount, boolean sleep) throws IOException {
|
|
|
|
+ this(handlerCount, sleep, LongWritable.class, null);
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ public TestServer(int handlerCount, boolean sleep,
|
|
|
|
+ Class<? extends Writable> paramClass,
|
|
|
|
+ Class<? extends Writable> responseClass)
|
|
|
|
+ throws IOException {
|
|
|
|
+ super(ADDRESS, 0, paramClass, handlerCount, conf);
|
|
|
|
+ this.sleep = sleep;
|
|
|
|
+ this.responseClass = responseClass;
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ @Override
|
|
|
|
+ public Writable call(Class<?> protocol, Writable param, long receiveTime)
|
|
|
|
+ throws IOException {
|
|
|
|
+ if (sleep) {
|
|
|
|
+ // sleep a bit
|
|
|
|
+ try {
|
|
|
|
+ Thread.sleep(PING_INTERVAL + MIN_SLEEP_TIME);
|
|
|
|
+ } catch (InterruptedException e) {}
|
|
|
|
+ }
|
|
|
|
+ if (responseClass != null) {
|
|
|
|
+ try {
|
|
|
|
+ return responseClass.newInstance();
|
|
|
|
+ } catch (Exception e) {
|
|
|
|
+ throw new RuntimeException(e);
|
|
|
|
+ }
|
|
|
|
+ } else {
|
|
|
|
+ return param; // echo param as result
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+
|
|
// writes 'len' bytes of data to out.
|
|
// writes 'len' bytes of data to out.
|
|
private static void writeData(OutputStream out, int len) throws IOException {
|
|
private static void writeData(OutputStream out, int len) throws IOException {
|
|
byte [] buf = new byte[4096*16];
|
|
byte [] buf = new byte[4096*16];
|
|
@@ -72,7 +128,6 @@ public class TestDFSClientRetries extends TestCase {
|
|
*/
|
|
*/
|
|
public void testWriteTimeoutAtDataNode() throws IOException,
|
|
public void testWriteTimeoutAtDataNode() throws IOException,
|
|
InterruptedException {
|
|
InterruptedException {
|
|
- Configuration conf = new Configuration();
|
|
|
|
|
|
|
|
final int writeTimeout = 100; //milliseconds.
|
|
final int writeTimeout = 100; //milliseconds.
|
|
// set a very short write timeout for datanode, so that tests runs fast.
|
|
// set a very short write timeout for datanode, so that tests runs fast.
|
|
@@ -245,8 +300,6 @@ public class TestDFSClientRetries extends TestCase {
|
|
|
|
|
|
public void testNotYetReplicatedErrors() throws IOException
|
|
public void testNotYetReplicatedErrors() throws IOException
|
|
{
|
|
{
|
|
- Configuration conf = new Configuration();
|
|
|
|
-
|
|
|
|
// allow 1 retry (2 total calls)
|
|
// allow 1 retry (2 total calls)
|
|
conf.setInt("dfs.client.block.write.locateFollowingBlock.retries", 1);
|
|
conf.setInt("dfs.client.block.write.locateFollowingBlock.retries", 1);
|
|
|
|
|
|
@@ -273,7 +326,6 @@ public class TestDFSClientRetries extends TestCase {
|
|
long fileSize = 4096;
|
|
long fileSize = 4096;
|
|
Path file = new Path("/testFile");
|
|
Path file = new Path("/testFile");
|
|
|
|
|
|
- Configuration conf = new Configuration();
|
|
|
|
MiniDFSCluster cluster = new MiniDFSCluster(conf, 1, true, null);
|
|
MiniDFSCluster cluster = new MiniDFSCluster(conf, 1, true, null);
|
|
|
|
|
|
int maxBlockAcquires = DFSClient.getMaxBlockAcquireFailures(conf);
|
|
int maxBlockAcquires = DFSClient.getMaxBlockAcquireFailures(conf);
|
|
@@ -383,42 +435,36 @@ public class TestDFSClientRetries extends TestCase {
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
|
|
-
|
|
|
|
- /**
|
|
|
|
- * The following test first creates a file.
|
|
|
|
- * It verifies the block information from a datanode.
|
|
|
|
- * Then, it stops the DN and observes timeout on connection attempt.
|
|
|
|
|
|
+ /** Test that timeout occurs when DN does not respond to RPC.
|
|
|
|
+ * Start up a server and ask it to sleep for n seconds. Make an
|
|
|
|
+ * RPC to the server and set rpcTimeout to less than n and ensure
|
|
|
|
+ * that socketTimeoutException is obtained
|
|
*/
|
|
*/
|
|
- public void testDFSClientTimeout() throws Exception {
|
|
|
|
- Configuration conf = new Configuration();
|
|
|
|
- MiniDFSCluster cluster = null;
|
|
|
|
|
|
+ public void testClientDNProtocolTimeout() throws IOException {
|
|
|
|
+ final Server server = new TestServer(1, true);
|
|
|
|
+ server.start();
|
|
|
|
+
|
|
|
|
+ final InetSocketAddress addr = NetUtils.getConnectAddress(server);
|
|
|
|
+ DatanodeID fakeDnId = new DatanodeID(
|
|
|
|
+ "localhost:" + addr.getPort(), "fake-storage", 0, addr.getPort());
|
|
|
|
+ DatanodeInfo dnInfo = new DatanodeInfo(fakeDnId);
|
|
|
|
+
|
|
|
|
+ LocatedBlock fakeBlock = new LocatedBlock(new Block(12345L), new DatanodeInfo[0]);
|
|
|
|
+
|
|
|
|
+ ClientDatanodeProtocol proxy = null;
|
|
|
|
|
|
try {
|
|
try {
|
|
- cluster = new MiniDFSCluster(conf, 3, true, null);
|
|
|
|
- cluster.waitActive();
|
|
|
|
|
|
+ proxy = DFSClient.createClientDatanodeProtocolProxy(dnInfo, conf,
|
|
|
|
+ fakeBlock.getBlock(), fakeBlock.getBlockToken(), 500);
|
|
|
|
|
|
- //create a file
|
|
|
|
- DistributedFileSystem dfs = (DistributedFileSystem)cluster.getFileSystem();
|
|
|
|
- String filestr = "/foo";
|
|
|
|
- Path filepath = new Path(filestr);
|
|
|
|
- DFSTestUtil.createFile(dfs, filepath, 1024L, (short)3, 0L);
|
|
|
|
- assertTrue(dfs.getClient().exists(filestr));
|
|
|
|
-
|
|
|
|
- //get block info
|
|
|
|
- LocatedBlock locatedblock = TestInterDatanodeProtocol.getLastLocatedBlock(dfs.getClient().namenode, filestr);
|
|
|
|
- DatanodeInfo[] datanodeinfo = locatedblock.getLocations();
|
|
|
|
- assertTrue(datanodeinfo.length > 0);
|
|
|
|
-
|
|
|
|
- //shutdown a data node
|
|
|
|
- cluster.stopDataNode(datanodeinfo[0].getName());
|
|
|
|
- DFSClient.createClientDatanodeProtocolProxy(datanodeinfo[0], conf,
|
|
|
|
- locatedblock.getBlock(), locatedblock.getBlockToken(), 500);
|
|
|
|
- fail("Expected an exception to have been thrown");
|
|
|
|
- } catch (IOException e) {
|
|
|
|
- DFSClient.LOG.info("Got a SocketTimeoutException ", e);
|
|
|
|
|
|
+ fail ("Did not get expected exception: SocketTimeoutException");
|
|
|
|
+ } catch (SocketTimeoutException e) {
|
|
|
|
+ LOG.info("Got the expected Exception: SocketTimeoutException");
|
|
} finally {
|
|
} finally {
|
|
- if (cluster != null) {cluster.shutdown();}
|
|
|
|
|
|
+ if (proxy != null) {
|
|
|
|
+ RPC.stopProxy(proxy);
|
|
|
|
+ }
|
|
|
|
+ server.stop();
|
|
}
|
|
}
|
|
}
|
|
}
|
|
-
|
|
|
|
}
|
|
}
|