Browse Source

HDFS-8829. Make SO_RCVBUF and SO_SNDBUF size configurable for DataTransferProtocol sockets and allow configuring auto-tuning (He Tianyi via Colin P. McCabe)

Change-Id: I77dc71aaf9e14ef743f2a2cbebeec04a4f628c78
(cherry picked from commit 7b5cf5352efedc7d7ebdbb6b58f1b9a688812e75)
(cherry picked from commit 149d7315592165aee20b855b368d40db2ebd9f38)
Colin Patrick Mccabe 9 years ago
parent
commit
6f876f419d

+ 4 - 0
hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt

@@ -517,6 +517,10 @@ Release 2.7.3 - 2016-08-25
     HDFS-8845. DiskChecker should not traverse the entire tree (Chang Li via
     Colin P. McCabe)
 
+    HDFS-8829. Make SO_RCVBUF and SO_SNDBUF size configurable for
+    DataTransferProtocol sockets and allow configuring auto-tuning (He Tianyi
+    via Colin P. McCabe)
+
   BUG FIXES
 
     HDFS-9289. Make DataStreamer#block thread safe and verify genStamp in

+ 15 - 0
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java

@@ -22,6 +22,7 @@ import java.util.concurrent.TimeUnit;
 
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.fs.CommonConfigurationKeys;
+import org.apache.hadoop.hdfs.protocol.HdfsConstants;
 import org.apache.hadoop.hdfs.server.blockmanagement.BlockPlacementPolicyDefault;
 import org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.RamDiskReplicaLruTracker;
 import org.apache.hadoop.http.HttpConfig;
@@ -871,4 +872,18 @@ public class DFSConfigKeys extends CommonConfigurationKeys {
     "dfs.datanode.block-pinning.enabled";
   public static final boolean DFS_DATANODE_BLOCK_PINNING_ENABLED_DEFAULT =
     false;
+
+  public static final String
+      DFS_DATANODE_TRANSFER_SOCKET_SEND_BUFFER_SIZE_KEY =
+      "dfs.datanode.transfer.socket.send.buffer.size";
+  public static final int
+      DFS_DATANODE_TRANSFER_SOCKET_SEND_BUFFER_SIZE_DEFAULT =
+      HdfsConstants.DEFAULT_DATA_SOCKET_SIZE;
+
+  public static final String
+      DFS_DATANODE_TRANSFER_SOCKET_RECV_BUFFER_SIZE_KEY =
+      "dfs.datanode.transfer.socket.recv.buffer.size";
+  public static final int
+      DFS_DATANODE_TRANSFER_SOCKET_RECV_BUFFER_SIZE_DEFAULT =
+      HdfsConstants.DEFAULT_DATA_SOCKET_SIZE;
 }

+ 5 - 0
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/net/DomainPeerServer.java

@@ -49,6 +49,11 @@ public class DomainPeerServer implements PeerServer {
     sock.setAttribute(DomainSocket.RECEIVE_BUFFER_SIZE, size);
   }
 
+  @Override
+  public int getReceiveBufferSize() throws IOException {
+    return sock.getAttribute(DomainSocket.RECEIVE_BUFFER_SIZE);
+  }
+
   @Override
   public Peer accept() throws IOException, SocketTimeoutException {
     DomainSocket connSock = sock.accept();

+ 8 - 1
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/net/PeerServer.java

@@ -32,7 +32,14 @@ public interface PeerServer extends Closeable {
   public void setReceiveBufferSize(int size) throws IOException;
 
   /**
-   * Listens for a connection to be made to this server and accepts 
+   * Get the receive buffer size of the PeerServer.
+   *
+   * @return     The receive buffer size.
+   */
+  int getReceiveBufferSize() throws IOException;
+
+  /**
+   * Listens for a connection to be made to this server and accepts
    * it. The method blocks until a connection is made.
    *
    * @exception IOException  if an I/O error occurs when waiting for a

+ 5 - 0
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/net/TcpPeerServer.java

@@ -136,6 +136,11 @@ public class TcpPeerServer implements PeerServer {
     this.serverSocket.setReceiveBufferSize(size);
   }
 
+  @Override
+  public int getReceiveBufferSize() throws IOException {
+    return this.serverSocket.getReceiveBufferSize();
+  }
+
   @Override
   public Peer accept() throws IOException, SocketTimeoutException {
     Peer peer = peerFromSocket(serverSocket.accept());

+ 19 - 3
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DNConf.java

@@ -70,7 +70,9 @@ public class DNConf {
   final int socketTimeout;
   final int socketWriteTimeout;
   final int socketKeepaliveTimeout;
-  
+  private final int transferSocketSendBufferSize;
+  private final int transferSocketRecvBufferSize;
+
   final boolean transferToAllowed;
   final boolean dropCacheBehindWrites;
   final boolean syncBehindWrites;
@@ -113,8 +115,14 @@ public class DNConf {
     socketKeepaliveTimeout = conf.getInt(
         DFSConfigKeys.DFS_DATANODE_SOCKET_REUSE_KEEPALIVE_KEY,
         DFSConfigKeys.DFS_DATANODE_SOCKET_REUSE_KEEPALIVE_DEFAULT);
-    
-    /* Based on results on different platforms, we might need set the default 
+    this.transferSocketSendBufferSize = conf.getInt(
+        DFSConfigKeys.DFS_DATANODE_TRANSFER_SOCKET_SEND_BUFFER_SIZE_KEY,
+        DFSConfigKeys.DFS_DATANODE_TRANSFER_SOCKET_SEND_BUFFER_SIZE_DEFAULT);
+    this.transferSocketRecvBufferSize = conf.getInt(
+        DFSConfigKeys.DFS_DATANODE_TRANSFER_SOCKET_RECV_BUFFER_SIZE_KEY,
+        DFSConfigKeys.DFS_DATANODE_TRANSFER_SOCKET_RECV_BUFFER_SIZE_DEFAULT);
+
+    /* Based on results on different platforms, we might need set the default
      * to false on some of them. */
     transferToAllowed = conf.getBoolean(
         DFS_DATANODE_TRANSFERTO_ALLOWED_KEY,
@@ -280,4 +288,12 @@ public class DNConf {
   public long getBpReadyTimeout() {
     return bpReadyTimeout;
   }
+
+  public int getTransferSocketRecvBufferSize() {
+    return transferSocketRecvBufferSize;
+  }
+
+  public int getTransferSocketSendBufferSize() {
+    return transferSocketSendBufferSize;
+  }
 }

+ 10 - 3
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java

@@ -905,7 +905,10 @@ public class DataNode extends ReconfigurableBase
       tcpPeerServer = new TcpPeerServer(dnConf.socketWriteTimeout,
           DataNode.getStreamingAddr(conf), backlogLength);
     }
-    tcpPeerServer.setReceiveBufferSize(HdfsConstants.DEFAULT_DATA_SOCKET_SIZE);
+    if (dnConf.getTransferSocketRecvBufferSize() > 0) {
+      tcpPeerServer.setReceiveBufferSize(
+          dnConf.getTransferSocketRecvBufferSize());
+    }
     streamingAddr = tcpPeerServer.getStreamingAddr();
     LOG.info("Opened streaming server at " + streamingAddr);
     this.threadGroup = new ThreadGroup("dataXceiverServer");
@@ -953,8 +956,12 @@ public class DataNode extends ReconfigurableBase
     }
     DomainPeerServer domainPeerServer =
       new DomainPeerServer(domainSocketPath, port);
-    domainPeerServer.setReceiveBufferSize(
-        HdfsConstants.DEFAULT_DATA_SOCKET_SIZE);
+    int recvBufferSize = conf.getInt(
+        DFSConfigKeys.DFS_DATANODE_TRANSFER_SOCKET_RECV_BUFFER_SIZE_KEY,
+        DFSConfigKeys.DFS_DATANODE_TRANSFER_SOCKET_RECV_BUFFER_SIZE_DEFAULT);
+    if (recvBufferSize > 0) {
+      domainPeerServer.setReceiveBufferSize(recvBufferSize);
+    }
     return domainPeerServer;
   }
   

+ 5 - 2
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java

@@ -707,8 +707,11 @@ class DataXceiver extends Receiver implements Runnable {
                       (HdfsServerConstants.WRITE_TIMEOUT_EXTENSION * targets.length);
           NetUtils.connect(mirrorSock, mirrorTarget, timeoutValue);
           mirrorSock.setSoTimeout(timeoutValue);
-          mirrorSock.setSendBufferSize(HdfsConstants.DEFAULT_DATA_SOCKET_SIZE);
-          
+          if (dnConf.getTransferSocketSendBufferSize() > 0) {
+            mirrorSock.setSendBufferSize(
+                dnConf.getTransferSocketSendBufferSize());
+          }
+
           OutputStream unbufMirrorOut = NetUtils.getOutputStream(mirrorSock,
               writeTimeout);
           InputStream unbufMirrorIn = NetUtils.getInputStream(mirrorSock);

+ 6 - 1
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiverServer.java

@@ -278,7 +278,12 @@ class DataXceiverServer implements Runnable {
   synchronized int getNumPeersXceiver() {
     return peersXceiver.size();
   }
-  
+
+  @VisibleForTesting
+  PeerServer getPeerServer() {
+    return peerServer;
+  }
+
   synchronized void releasePeer(Peer peer) {
     peers.remove(peer);
     peersXceiver.remove(peer);

+ 22 - 0
hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml

@@ -2640,4 +2640,26 @@
     </description>
   </property>
 
+<property>
+  <name>dfs.datanode.transfer.socket.send.buffer.size</name>
+  <value>131072</value>
+  <description>
+    Socket send buffer size for DataXceiver (mirroring packets to downstream
+    in pipeline). This may affect TCP connection throughput.
+    If it is set to zero or negative value, no buffer size will be set
+    explicitly, thus enable tcp auto-tuning on some system.
+  </description>
+</property>
+
+<property>
+  <name>dfs.datanode.transfer.socket.recv.buffer.size</name>
+  <value>131072</value>
+  <description>
+    Socket receive buffer size for DataXceiver (receiving packets from client
+    during block writing). This may affect TCP connection throughput.
+    If it is set to zero or negative value, no buffer size will be set
+    explicitly, thus enable tcp auto-tuning on some system.
+  </description>
+</property>
+
 </configuration>

+ 71 - 0
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeTransferSocketSize.java

@@ -0,0 +1,71 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.datanode;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+import java.util.List;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hdfs.DFSConfigKeys;
+import org.apache.hadoop.hdfs.HdfsConfiguration;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.junit.Test;
+
+public class TestDataNodeTransferSocketSize {
+
+  @Test
+  public void testSpecifiedDataSocketSize() throws Exception {
+    Configuration conf = new HdfsConfiguration();
+    conf.setInt(
+      DFSConfigKeys.DFS_DATANODE_TRANSFER_SOCKET_RECV_BUFFER_SIZE_KEY, 4 * 1024);
+    SimulatedFSDataset.setFactory(conf);
+    MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).build();
+    try {
+      List<DataNode> datanodes = cluster.getDataNodes();
+      DataNode datanode = datanodes.get(0);
+      assertEquals("Receive buffer size should be 4K",
+        4 * 1024, datanode.getXferServer().getPeerServer().getReceiveBufferSize());
+    } finally {
+      if (cluster != null) {
+        cluster.shutdown();
+      }
+    }
+  }
+
+  @Test
+  public void testAutoTuningDataSocketSize() throws Exception {
+    Configuration conf = new HdfsConfiguration();
+    conf.setInt(
+      DFSConfigKeys.DFS_DATANODE_TRANSFER_SOCKET_RECV_BUFFER_SIZE_KEY, 0);
+    SimulatedFSDataset.setFactory(conf);
+    MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).build();
+    try {
+      List<DataNode> datanodes = cluster.getDataNodes();
+      DataNode datanode = datanodes.get(0);
+      assertTrue(
+        "Receive buffer size should be a default value (determined by kernel)",
+        datanode.getXferServer().getPeerServer().getReceiveBufferSize() > 0);
+    } finally {
+      if (cluster != null) {
+        cluster.shutdown();
+      }
+    }
+  }
+}