|
@@ -17,10 +17,12 @@
|
|
|
*/
|
|
|
package org.apache.hadoop.hdfs;
|
|
|
|
|
|
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_MAX_BLOCK_ACQUIRE_FAILURES_KEY;
|
|
|
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_SOCKET_REUSE_KEEPALIVE_KEY;
|
|
|
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_SOCKET_WRITE_TIMEOUT_KEY;
|
|
|
import static org.junit.Assert.*;
|
|
|
|
|
|
+import java.io.InputStream;
|
|
|
import java.io.PrintWriter;
|
|
|
import java.net.InetSocketAddress;
|
|
|
import java.net.Socket;
|
|
@@ -40,6 +42,8 @@ import org.junit.After;
|
|
|
import org.junit.Before;
|
|
|
import org.junit.Test;
|
|
|
|
|
|
+import com.google.common.io.NullOutputStream;
|
|
|
+
|
|
|
public class TestDataTransferKeepalive {
|
|
|
Configuration conf = new HdfsConfiguration();
|
|
|
private MiniDFSCluster cluster;
|
|
@@ -56,6 +60,8 @@ public class TestDataTransferKeepalive {
|
|
|
public void setup() throws Exception {
|
|
|
conf.setInt(DFS_DATANODE_SOCKET_REUSE_KEEPALIVE_KEY,
|
|
|
KEEPALIVE_TIMEOUT);
|
|
|
+ conf.setInt(DFS_CLIENT_MAX_BLOCK_ACQUIRE_FAILURES_KEY,
|
|
|
+ 0);
|
|
|
|
|
|
cluster = new MiniDFSCluster.Builder(conf)
|
|
|
.numDataNodes(1).build();
|
|
@@ -143,6 +149,40 @@ public class TestDataTransferKeepalive {
|
|
|
IOUtils.closeStream(stm);
|
|
|
}
|
|
|
}
|
|
|
+
|
|
|
+ @Test(timeout=30000)
|
|
|
+ public void testManyClosedSocketsInCache() throws Exception {
|
|
|
+ // Make a small file
|
|
|
+ DFSTestUtil.createFile(fs, TEST_FILE, 1L, (short)1, 0L);
|
|
|
+
|
|
|
+ // Insert a bunch of dead sockets in the cache, by opening
|
|
|
+ // many streams concurrently, reading all of the data,
|
|
|
+ // and then closing them.
|
|
|
+ InputStream[] stms = new InputStream[5];
|
|
|
+ try {
|
|
|
+ for (int i = 0; i < stms.length; i++) {
|
|
|
+ stms[i] = fs.open(TEST_FILE);
|
|
|
+ }
|
|
|
+ for (InputStream stm : stms) {
|
|
|
+ IOUtils.copyBytes(stm, new NullOutputStream(), 1024);
|
|
|
+ }
|
|
|
+ } finally {
|
|
|
+ IOUtils.cleanup(null, stms);
|
|
|
+ }
|
|
|
+
|
|
|
+ DFSClient client = ((DistributedFileSystem)fs).dfs;
|
|
|
+ assertEquals(5, client.socketCache.size());
|
|
|
+
|
|
|
+ // Let all the xceivers timeout
|
|
|
+ Thread.sleep(1500);
|
|
|
+ assertXceiverCount(0);
|
|
|
+
|
|
|
+ // Client side still has the sockets cached
|
|
|
+ assertEquals(5, client.socketCache.size());
|
|
|
+
|
|
|
+ // Reading should not throw an exception.
|
|
|
+ DFSTestUtil.readFile(fs, TEST_FILE);
|
|
|
+ }
|
|
|
|
|
|
private void assertXceiverCount(int expected) {
|
|
|
// Subtract 1, since the DataXceiverServer
|