Browse Source

HDFS-9669. TcpPeerServer should respect ipc.server.listen.queue.size (Elliot Clark via cmccabe)

(cherry picked from commit 2da03b48eba53d4dec2a77209ad9835d808171d1)
(cherry picked from commit 60d3a3c30b05fdeec5cc369a698297ee309d620e)
(cherry picked from commit c4c94e1cf2a8e03ee7888c69a7dd8f2b7e2df14d)
(cherry picked from commit 449d2ad85c64f2e1ab9cde38460094b935faa66e)
Colin Patrick Mccabe 9 năm trước cách đây
mục cha
commit
e6856d66e6

+ 3 - 0
hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt

@@ -17,6 +17,9 @@ Release 2.6.5 - UNRELEASED
     HDFS-7258. CacheReplicationMonitor rescan schedule log should use DEBUG
     level instead of INFO level. (Xiaoyu Yao via wheat9)
 
+    HDFS-9669. TcpPeerServer should respect ipc.server.listen.queue.size
+    (Elliot Clark via cmccabe)
+
   OPTIMIZATIONS
 
     HDFS-8845. DiskChecker should not traverse the entire tree (Chang Li via

+ 4 - 2
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/net/TcpPeerServer.java

@@ -102,13 +102,15 @@ public class TcpPeerServer implements PeerServer {
    *
    * @param socketWriteTimeout    The Socket write timeout in ms.
    * @param bindAddr              The address to bind to.
+   * @param backlogLength         The length of the tcp accept backlog
    * @throws IOException
    */
   public TcpPeerServer(int socketWriteTimeout,
-        InetSocketAddress bindAddr) throws IOException {
+                       InetSocketAddress bindAddr,
+                       int backlogLength) throws IOException {
     this.serverSocket = (socketWriteTimeout > 0) ?
           ServerSocketChannel.open().socket() : new ServerSocket();
-    Server.bind(serverSocket, bindAddr, 0);
+    Server.bind(serverSocket, bindAddr, backlogLength);
   }
 
   /**

+ 5 - 1
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java

@@ -47,6 +47,7 @@ import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_STARTUP_KEY;
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_MAX_NUM_BLOCKS_TO_LOG_DEFAULT;
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_MAX_NUM_BLOCKS_TO_LOG_KEY;
 import static org.apache.hadoop.util.ExitUtil.terminate;
+import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
 
 import java.io.BufferedOutputStream;
 import java.io.ByteArrayInputStream;
@@ -868,8 +869,11 @@ public class DataNode extends ReconfigurableBase
     if (secureResources != null) {
       tcpPeerServer = new TcpPeerServer(secureResources);
     } else {
+      int backlogLength = conf.getInt(
+          CommonConfigurationKeysPublic.IPC_SERVER_LISTEN_QUEUE_SIZE_KEY,
+          CommonConfigurationKeysPublic.IPC_SERVER_LISTEN_QUEUE_SIZE_DEFAULT);
       tcpPeerServer = new TcpPeerServer(dnConf.socketWriteTimeout,
-          DataNode.getStreamingAddr(conf));
+          DataNode.getStreamingAddr(conf), backlogLength);
     }
     tcpPeerServer.setReceiveBufferSize(HdfsConstants.DEFAULT_DATA_SOCKET_SIZE);
     streamingAddr = tcpPeerServer.getStreamingAddr();

+ 5 - 1
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/SecureDataNodeStarter.java

@@ -23,6 +23,7 @@ import java.nio.channels.ServerSocketChannel;
 import org.apache.commons.daemon.Daemon;
 import org.apache.commons.daemon.DaemonContext;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
 import org.apache.hadoop.hdfs.DFSConfigKeys;
 import org.apache.hadoop.hdfs.DFSUtil;
 import org.apache.hadoop.hdfs.HdfsConfiguration;
@@ -97,10 +98,13 @@ public class SecureDataNodeStarter implements Daemon {
     int socketWriteTimeout = conf.getInt(
         DFSConfigKeys.DFS_DATANODE_SOCKET_WRITE_TIMEOUT_KEY,
         HdfsServerConstants.WRITE_TIMEOUT);
+    int backlogLength = conf.getInt(
+        CommonConfigurationKeysPublic.IPC_SERVER_LISTEN_QUEUE_SIZE_KEY,
+        CommonConfigurationKeysPublic.IPC_SERVER_LISTEN_QUEUE_SIZE_DEFAULT);
 
     ServerSocket ss = (socketWriteTimeout > 0) ? 
         ServerSocketChannel.open().socket() : new ServerSocket();
-    ss.bind(streamingAddr, 0);
+    ss.bind(streamingAddr, backlogLength);
 
     // Check that we got the port we need
     if (ss.getLocalPort() != streamingAddr.getPort()) {