Преглед на файлове

HDFS-9914. Fix configurable WebhDFS connect/read timeout. Contributed by Xiaoyu Yao.

Arpit Agarwal преди 7 години
родител
ревизия
2d4629f386

+ 36 - 25
hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/web/URLConnectionFactory.java

@@ -23,7 +23,6 @@ import java.net.HttpURLConnection;
 import java.net.URL;
 import java.net.URLConnection;
 import java.security.GeneralSecurityException;
-import java.util.concurrent.TimeUnit;
 
 import javax.net.ssl.HostnameVerifier;
 import javax.net.ssl.HttpsURLConnection;
@@ -32,7 +31,6 @@ import javax.net.ssl.SSLSocketFactory;
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceStability;
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys;
 import org.apache.hadoop.hdfs.web.oauth2.OAuth2ConnectionConfigurator;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.security.authentication.client.AuthenticatedURL;
@@ -84,22 +82,48 @@ public class URLConnectionFactory {
    */
   public static URLConnectionFactory newDefaultURLConnectionFactory(
       Configuration conf) {
-    ConnectionConfigurator conn = getSSLConnectionConfiguration(conf);
+    ConnectionConfigurator conn = getSSLConnectionConfiguration(
+        DEFAULT_SOCKET_TIMEOUT, DEFAULT_SOCKET_TIMEOUT, conf);
 
     return new URLConnectionFactory(conn);
   }
 
+  /**
+   * Construct a new URLConnectionFactory based on the configuration. It will
+   * hornor connecTimeout and readTimeout when they are specified.
+   */
+  public static URLConnectionFactory newDefaultURLConnectionFactory(
+      int connectTimeout, int readTimeout, Configuration conf) {
+    ConnectionConfigurator conn = getSSLConnectionConfiguration(
+        connectTimeout, readTimeout, conf);
+    return new URLConnectionFactory(conn);
+  }
+
   private static ConnectionConfigurator getSSLConnectionConfiguration(
-      Configuration conf) {
+      final int connectTimeout, final int readTimeout, Configuration conf) {
     ConnectionConfigurator conn;
     try {
-      conn = newSslConnConfigurator(DEFAULT_SOCKET_TIMEOUT, conf);
+      conn = newSslConnConfigurator(connectTimeout, readTimeout, conf);
     } catch (Exception e) {
       LOG.warn(
           "Cannot load customized ssl related configuration. Fallback to" +
               " system-generic settings.",
           e);
-      conn = DEFAULT_TIMEOUT_CONN_CONFIGURATOR;
+      if (connectTimeout == DEFAULT_SOCKET_TIMEOUT &&
+          readTimeout == DEFAULT_SOCKET_TIMEOUT) {
+        conn = DEFAULT_TIMEOUT_CONN_CONFIGURATOR;
+      } else {
+        conn = new ConnectionConfigurator() {
+          @Override
+          public HttpURLConnection configure(HttpURLConnection connection)
+              throws IOException {
+            URLConnectionFactory.setTimeouts(connection,
+                connectTimeout,
+                readTimeout);
+            return connection;
+          }
+        };
+      }
     }
 
     return conn;
@@ -110,11 +134,12 @@ public class URLConnectionFactory {
    * It will also try to load the SSL configuration when they are specified.
    */
   public static URLConnectionFactory newOAuth2URLConnectionFactory(
-      Configuration conf) throws IOException {
+      int connectTimeout, int readTimeout, Configuration conf)
+      throws IOException {
     ConnectionConfigurator conn;
     try {
       ConnectionConfigurator sslConnConfigurator
-          = newSslConnConfigurator(DEFAULT_SOCKET_TIMEOUT, conf);
+          = newSslConnConfigurator(connectTimeout, readTimeout, conf);
 
       conn = new OAuth2ConnectionConfigurator(conf, sslConnConfigurator);
     } catch (Exception e) {
@@ -128,33 +153,18 @@ public class URLConnectionFactory {
     this.connConfigurator = connConfigurator;
   }
 
-  /**
-   * Create a new ConnectionConfigurator for SSL connections
-   */
   private static ConnectionConfigurator newSslConnConfigurator(
-      final int defaultTimeout, Configuration conf)
+      final int connectTimeout, final int readTimeout, Configuration conf)
       throws IOException, GeneralSecurityException {
     final SSLFactory factory;
     final SSLSocketFactory sf;
     final HostnameVerifier hv;
-    final int connectTimeout;
-    final int readTimeout;
 
     factory = new SSLFactory(SSLFactory.Mode.CLIENT, conf);
     factory.init();
     sf = factory.createSSLSocketFactory();
     hv = factory.getHostnameVerifier();
 
-    connectTimeout = (int) conf.getTimeDuration(
-        HdfsClientConfigKeys.DFS_WEBHDFS_SOCKET_CONNECT_TIMEOUT_KEY,
-        defaultTimeout,
-        TimeUnit.MILLISECONDS);
-
-    readTimeout = (int) conf.getTimeDuration(
-        HdfsClientConfigKeys.DFS_WEBHDFS_SOCKET_READ_TIMEOUT_KEY,
-        defaultTimeout,
-        TimeUnit.MILLISECONDS);
-
     return new ConnectionConfigurator() {
       @Override
       public HttpURLConnection configure(HttpURLConnection conn)
@@ -222,7 +232,8 @@ public class URLConnectionFactory {
    *
    * @param connection
    *          URLConnection to set
-   * @param socketTimeout
+   * @param connectTimeout
+   * @param readTimeout
    *          the connection and read timeout of the connection.
    */
   private static void setTimeouts(URLConnection connection,

+ 14 - 2
hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/web/WebHdfsFileSystem.java

@@ -46,6 +46,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.StringTokenizer;
+import java.util.concurrent.TimeUnit;
 
 import javax.ws.rs.core.HttpHeaders;
 import javax.ws.rs.core.MediaType;
@@ -186,6 +187,17 @@ public class WebHdfsFileSystem extends FileSystem
         HdfsClientConfigKeys.DFS_WEBHDFS_USER_PATTERN_KEY,
         HdfsClientConfigKeys.DFS_WEBHDFS_USER_PATTERN_DEFAULT));
 
+    int connectTimeout = (int) conf.getTimeDuration(
+        HdfsClientConfigKeys.DFS_WEBHDFS_SOCKET_CONNECT_TIMEOUT_KEY,
+        URLConnectionFactory.DEFAULT_SOCKET_TIMEOUT,
+        TimeUnit.MILLISECONDS);
+
+    int readTimeout = (int) conf.getTimeDuration(
+        HdfsClientConfigKeys.DFS_WEBHDFS_SOCKET_READ_TIMEOUT_KEY,
+        URLConnectionFactory.DEFAULT_SOCKET_TIMEOUT,
+        TimeUnit.MILLISECONDS);
+
+
     boolean isOAuth = conf.getBoolean(
         HdfsClientConfigKeys.DFS_WEBHDFS_OAUTH_ENABLED_KEY,
         HdfsClientConfigKeys.DFS_WEBHDFS_OAUTH_ENABLED_DEFAULT);
@@ -193,11 +205,11 @@ public class WebHdfsFileSystem extends FileSystem
     if(isOAuth) {
       LOG.debug("Enabling OAuth2 in WebHDFS");
       connectionFactory = URLConnectionFactory
-          .newOAuth2URLConnectionFactory(conf);
+          .newOAuth2URLConnectionFactory(connectTimeout, readTimeout, conf);
     } else {
       LOG.debug("Not enabling OAuth2 in WebHDFS");
       connectionFactory = URLConnectionFactory
-          .newDefaultURLConnectionFactory(conf);
+          .newDefaultURLConnectionFactory(connectTimeout, readTimeout, conf);
     }
 
     ugi = UserGroupInformation.getCurrentUser();