|
@@ -22,7 +22,6 @@ import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
|
|
import org.apache.hadoop.test.GenericTestUtils;
|
|
import org.apache.hadoop.test.GenericTestUtils;
|
|
import org.apache.log4j.Level;
|
|
import org.apache.log4j.Level;
|
|
|
|
|
|
-import org.junit.After;
|
|
|
|
import org.junit.Test;
|
|
import org.junit.Test;
|
|
|
|
|
|
import org.slf4j.Logger;
|
|
import org.slf4j.Logger;
|
|
@@ -33,7 +32,6 @@ import java.net.Socket;
|
|
|
|
|
|
import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_SOCKET_SEND_BUFFER_SIZE_DEFAULT;
|
|
import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_SOCKET_SEND_BUFFER_SIZE_DEFAULT;
|
|
import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_SOCKET_SEND_BUFFER_SIZE_KEY;
|
|
import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_SOCKET_SEND_BUFFER_SIZE_KEY;
|
|
-import static org.junit.Assert.assertEquals;
|
|
|
|
import static org.junit.Assert.assertTrue;
|
|
import static org.junit.Assert.assertTrue;
|
|
|
|
|
|
public class TestDFSClientSocketSize {
|
|
public class TestDFSClientSocketSize {
|
|
@@ -43,55 +41,65 @@ public class TestDFSClientSocketSize {
|
|
GenericTestUtils.setLogLevel(DataStreamer.LOG, Level.ALL);
|
|
GenericTestUtils.setLogLevel(DataStreamer.LOG, Level.ALL);
|
|
}
|
|
}
|
|
|
|
|
|
- private final Configuration conf = new Configuration();
|
|
|
|
- private MiniDFSCluster cluster;
|
|
|
|
- private Socket socket;
|
|
|
|
-
|
|
|
|
|
|
+ /**
|
|
|
|
+ * The setting of socket send buffer size in
|
|
|
|
+ * {@link java.net.Socket#setSendBufferSize(int)} is only a hint. Actual
|
|
|
|
+ * value may differ. We just sanity check that it is somewhere close.
|
|
|
|
+ */
|
|
@Test
|
|
@Test
|
|
public void testDefaultSendBufferSize() throws IOException {
|
|
public void testDefaultSendBufferSize() throws IOException {
|
|
- socket = createSocket();
|
|
|
|
- assertEquals("Send buffer size should be the default value.",
|
|
|
|
- DFS_CLIENT_SOCKET_SEND_BUFFER_SIZE_DEFAULT,
|
|
|
|
- socket.getSendBufferSize());
|
|
|
|
|
|
+ assertTrue("Send buffer size should be somewhere near default.",
|
|
|
|
+ getSendBufferSize(new Configuration()) >=
|
|
|
|
+ DFS_CLIENT_SOCKET_SEND_BUFFER_SIZE_DEFAULT / 2);
|
|
}
|
|
}
|
|
|
|
|
|
|
|
+ /**
|
|
|
|
+ * Note that {@link java.net.Socket#setSendBufferSize(int)} is only a hint.
|
|
|
|
+ * If this test is flaky it should be ignored. See HADOOP-13351.
|
|
|
|
+ */
|
|
@Test
|
|
@Test
|
|
public void testSpecifiedSendBufferSize() throws IOException {
|
|
public void testSpecifiedSendBufferSize() throws IOException {
|
|
- final int mySendBufferSize = 64 * 1024; // 64 KB
|
|
|
|
- conf.setInt(DFS_CLIENT_SOCKET_SEND_BUFFER_SIZE_KEY, mySendBufferSize);
|
|
|
|
- socket = createSocket();
|
|
|
|
- assertEquals("Send buffer size should be the customized value.",
|
|
|
|
- mySendBufferSize, socket.getSendBufferSize());
|
|
|
|
|
|
+ final Configuration conf1 = new Configuration();
|
|
|
|
+ conf1.setInt(DFS_CLIENT_SOCKET_SEND_BUFFER_SIZE_KEY, 256 * 1024); // 256 KB
|
|
|
|
+ final int sendBufferSize1 = getSendBufferSize(conf1);
|
|
|
|
+
|
|
|
|
+ final Configuration conf2 = new Configuration();
|
|
|
|
+ conf2.setInt(DFS_CLIENT_SOCKET_SEND_BUFFER_SIZE_KEY, 1024); // 1 KB
|
|
|
|
+ final int sendBufferSize2 = getSendBufferSize(conf2);
|
|
|
|
+
|
|
|
|
+ LOG.info("Large buf size is {}, small is {}",
|
|
|
|
+ sendBufferSize1, sendBufferSize2);
|
|
|
|
+ assertTrue("Larger specified send buffer should have effect",
|
|
|
|
+ sendBufferSize1 > sendBufferSize2);
|
|
}
|
|
}
|
|
|
|
|
|
@Test
|
|
@Test
|
|
public void testAutoTuningSendBufferSize() throws IOException {
|
|
public void testAutoTuningSendBufferSize() throws IOException {
|
|
|
|
+ final Configuration conf = new Configuration();
|
|
conf.setInt(DFS_CLIENT_SOCKET_SEND_BUFFER_SIZE_KEY, 0);
|
|
conf.setInt(DFS_CLIENT_SOCKET_SEND_BUFFER_SIZE_KEY, 0);
|
|
- socket = createSocket();
|
|
|
|
- LOG.info("The auto tuned send buffer size is: {}",
|
|
|
|
- socket.getSendBufferSize());
|
|
|
|
|
|
+ final int sendBufferSize = getSendBufferSize(conf);
|
|
|
|
+ LOG.info("The auto tuned send buffer size is: {}", sendBufferSize);
|
|
assertTrue("Send buffer size should be non-negative value which is " +
|
|
assertTrue("Send buffer size should be non-negative value which is " +
|
|
- "determined by system (kernel).", socket.getSendBufferSize() > 0);
|
|
|
|
|
|
+ "determined by system (kernel).", sendBufferSize > 0);
|
|
}
|
|
}
|
|
|
|
|
|
- @After
|
|
|
|
- public void tearDown() throws Exception {
|
|
|
|
- if (socket != null) {
|
|
|
|
- LOG.info("Closing the DFSClient socket.");
|
|
|
|
- }
|
|
|
|
- if (cluster != null) {
|
|
|
|
- LOG.info("Shutting down MiniDFSCluster.");
|
|
|
|
- cluster.shutdown();
|
|
|
|
- cluster = null;
|
|
|
|
|
|
+ private int getSendBufferSize(Configuration conf) throws IOException {
|
|
|
|
+ final MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf)
|
|
|
|
+ .numDataNodes(1)
|
|
|
|
+ .build();
|
|
|
|
+ try {
|
|
|
|
+ cluster.waitActive();
|
|
|
|
+ LOG.info("MiniDFSCluster started.");
|
|
|
|
+ try (Socket socket = DataStreamer.createSocketForPipeline(
|
|
|
|
+ new DatanodeInfo(cluster.dataNodes.get(0).datanode.getDatanodeId()),
|
|
|
|
+ 1, cluster.getFileSystem().getClient())) {
|
|
|
|
+ return socket.getSendBufferSize();
|
|
|
|
+ }
|
|
|
|
+ } finally {
|
|
|
|
+ if (cluster != null) {
|
|
|
|
+ cluster.shutdown();
|
|
|
|
+ }
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
|
|
- private Socket createSocket() throws IOException {
|
|
|
|
- cluster = new MiniDFSCluster.Builder(conf).numDataNodes(1).build();
|
|
|
|
- cluster.waitActive();
|
|
|
|
- LOG.info("MiniDFSCluster started.");
|
|
|
|
- return DataStreamer.createSocketForPipeline(
|
|
|
|
- new DatanodeInfo(cluster.dataNodes.get(0).datanode.getDatanodeId()),
|
|
|
|
- 1, cluster.getFileSystem().getClient());
|
|
|
|
- }
|
|
|
|
}
|
|
}
|