Selaa lähdekoodia

HADOOP-13212 Provide an option to set the socket buffers in S3AFileSystem (Rajesh Balamohan)

Steve Loughran 8 vuotta sitten
vanhempi
commit
0f1e02a298

+ 12 - 0
hadoop-common-project/hadoop-common/src/main/resources/core-default.xml

@@ -865,6 +865,18 @@
   <description>Socket connection timeout in milliseconds.</description>
 </property>
 
+<property>
+  <name>fs.s3a.socket.send.buffer</name>
+  <value>8192</value>
+  <description>Socket send buffer hint to amazon connector. Represented in bytes.</description>
+</property>
+
+<property>
+  <name>fs.s3a.socket.recv.buffer</name>
+  <value>8192</value>
+  <description>Socket receive buffer hint to amazon connector. Represented in bytes.</description>
+</property>
+
 <property>
   <name>fs.s3a.paging.maximum</name>
   <value>5000</value>

+ 8 - 0
hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Constants.java

@@ -82,6 +82,14 @@ public final class Constants {
   public static final String SOCKET_TIMEOUT = "fs.s3a.connection.timeout";
   public static final int DEFAULT_SOCKET_TIMEOUT = 200000;
 
+  // socket send buffer to be used in Amazon client
+  public static final String SOCKET_SEND_BUFFER = "fs.s3a.socket.send.buffer";
+  public static final int DEFAULT_SOCKET_SEND_BUFFER = 8 * 1024;
+
+  // socket send buffer to be used in Amazon client
+  public static final String SOCKET_RECV_BUFFER = "fs.s3a.socket.recv.buffer";
+  public static final int DEFAULT_SOCKET_RECV_BUFFER = 8 * 1024;
+
   // number of records to get while paging through a directory listing
   public static final String MAX_PAGING_KEYS = "fs.s3a.paging.maximum";
   public static final int DEFAULT_MAX_PAGING_KEYS = 5000;

+ 5 - 0
hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java

@@ -166,6 +166,11 @@ public class S3AFileSystem extends FileSystem {
           DEFAULT_ESTABLISH_TIMEOUT, 0));
       awsConf.setSocketTimeout(intOption(conf, SOCKET_TIMEOUT,
           DEFAULT_SOCKET_TIMEOUT, 0));
+      int sockSendBuffer = intOption(conf, SOCKET_SEND_BUFFER,
+          DEFAULT_SOCKET_SEND_BUFFER, 2048);
+      int sockRecvBuffer = intOption(conf, SOCKET_RECV_BUFFER,
+          DEFAULT_SOCKET_RECV_BUFFER, 2048);
+      awsConf.setSocketBufferSizeHints(sockSendBuffer, sockRecvBuffer);
       String signerOverride = conf.getTrimmed(SIGNING_ALGORITHM, "");
       if (!signerOverride.isEmpty()) {
         LOG.debug("Signer override = {}", signerOverride);

+ 12 - 0
hadoop-tools/hadoop-aws/src/site/markdown/tools/hadoop-aws/index.md

@@ -500,6 +500,18 @@ this capability.
       which each use a thread from the threadpool.</description>
     </property>
 
+    <property>
+      <name>fs.s3a.socket.send.buffer</name>
+      <value>8192</value>
+      <description>Socket send buffer hint to amazon connector. Represented in bytes.</description>
+    </property>
+
+    <property>
+      <name>fs.s3a.socket.recv.buffer</name>
+      <value>8192</value>
+      <description>Socket receive buffer hint to amazon connector. Represented in bytes.</description>
+    </property>
+
     <property>
       <name>fs.s3a.threads.keepalivetime</name>
       <value>60</value>

+ 3 - 0
hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/scale/TestS3AInputStreamPerformance.java

@@ -44,6 +44,7 @@ import java.io.EOFException;
 import java.io.IOException;
 
 import static org.apache.hadoop.fs.contract.ContractTestUtils.*;
+import static org.apache.hadoop.fs.s3a.Constants.*;
 
 /**
  * Look at the performance of S3a operations.
@@ -71,6 +72,8 @@ public class TestS3AInputStreamPerformance extends S3AScaleTestBase {
   @Before
   public void openFS() throws IOException {
     Configuration conf = getConf();
+    conf.setInt(SOCKET_SEND_BUFFER, 16 * 1024);
+    conf.setInt(SOCKET_RECV_BUFFER, 16 * 1024);
     String testFile =  conf.getTrimmed(KEY_CSVTEST_FILE, DEFAULT_CSVTEST_FILE);
     if (testFile.isEmpty()) {
       assumptionMessage = "Empty test property: " + KEY_CSVTEST_FILE;