Kaynağa Gözat

HADOOP-13953. Make FTPFileSystem's data connection mode and transfer mode configurable. Contributed by Xiao Chen.

(cherry picked from commit 0a212a40fcbd12a11294bff7a31e7433111733c9)

Conflicts:
   hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/conf/TestCommonConfigurationFields.java
Wei-Chiu Chuang 8 yıl önce
ebeveyn
işleme
76d62b0632

+ 69 - 1
hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/ftp/FTPFileSystem.java

@@ -23,6 +23,7 @@ import java.io.InputStream;
 import java.net.ConnectException;
 import java.net.URI;
 
+import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -65,6 +66,9 @@ public class FTPFileSystem extends FileSystem {
   public static final String FS_FTP_HOST = "fs.ftp.host";
   public static final String FS_FTP_HOST_PORT = "fs.ftp.host.port";
   public static final String FS_FTP_PASSWORD_PREFIX = "fs.ftp.password.";
+  public static final String FS_FTP_DATA_CONNECTION_MODE =
+      "fs.ftp.data.connection.mode";
+  public static final String FS_FTP_TRANSFER_MODE = "fs.ftp.transfer.mode";
   public static final String E_SAME_DIRECTORY_ONLY =
       "only same directory renames are supported";
 
@@ -143,9 +147,10 @@ public class FTPFileSystem extends FileSystem {
                    NetUtils.UNKNOWN_HOST, 0,
                    new ConnectException("Server response " + reply));
     } else if (client.login(user, password)) {
-      client.setFileTransferMode(FTP.BLOCK_TRANSFER_MODE);
+      client.setFileTransferMode(getTransferMode(conf));
       client.setFileType(FTP.BINARY_FILE_TYPE);
       client.setBufferSize(DEFAULT_BUFFER_SIZE);
+      setDataConnectionMode(client, conf);
     } else {
       throw new IOException("Login failed on server - " + host + ", port - "
           + port + " as user '" + user + "'");
@@ -154,6 +159,69 @@ public class FTPFileSystem extends FileSystem {
     return client;
   }
 
+  /**
+   * Set FTP's transfer mode based on configuration. Valid values are
+   * STREAM_TRANSFER_MODE, BLOCK_TRANSFER_MODE and COMPRESSED_TRANSFER_MODE.
+   * <p/>
+   * Defaults to BLOCK_TRANSFER_MODE.
+   *
+   * @param conf
+   * @return
+   */
+  @VisibleForTesting
+  int getTransferMode(Configuration conf) {
+    final String mode = conf.get(FS_FTP_TRANSFER_MODE);
+    // FTP default is STREAM_TRANSFER_MODE, but Hadoop FTPFS's default is
+    // FTP.BLOCK_TRANSFER_MODE historically.
+    int ret = FTP.BLOCK_TRANSFER_MODE;
+    if (mode == null) {
+      return ret;
+    }
+    final String upper = mode.toUpperCase();
+    if (upper.equals("STREAM_TRANSFER_MODE")) {
+      ret = FTP.STREAM_TRANSFER_MODE;
+    } else if (upper.equals("COMPRESSED_TRANSFER_MODE")) {
+      ret = FTP.COMPRESSED_TRANSFER_MODE;
+    } else {
+      if (!upper.equals("BLOCK_TRANSFER_MODE")) {
+        LOG.warn("Cannot parse the value for " + FS_FTP_TRANSFER_MODE + ": "
+            + mode + ". Using default.");
+      }
+    }
+    return ret;
+  }
+
+  /**
+   * Set the FTPClient's data connection mode based on configuration. Valid
+   * values are ACTIVE_LOCAL_DATA_CONNECTION_MODE,
+   * PASSIVE_LOCAL_DATA_CONNECTION_MODE and PASSIVE_REMOTE_DATA_CONNECTION_MODE.
+   * <p/>
+   * Defaults to ACTIVE_LOCAL_DATA_CONNECTION_MODE.
+   *
+   * @param client
+   * @param conf
+   * @throws IOException
+   */
+  @VisibleForTesting
+  void setDataConnectionMode(FTPClient client, Configuration conf)
+      throws IOException {
+    final String mode = conf.get(FS_FTP_DATA_CONNECTION_MODE);
+    if (mode == null) {
+      return;
+    }
+    final String upper = mode.toUpperCase();
+    if (upper.equals("PASSIVE_LOCAL_DATA_CONNECTION_MODE")) {
+      client.enterLocalPassiveMode();
+    } else if (upper.equals("PASSIVE_REMOTE_DATA_CONNECTION_MODE")) {
+      client.enterRemotePassiveMode();
+    } else {
+      if (!upper.equals("ACTIVE_LOCAL_DATA_CONNECTION_MODE")) {
+        LOG.warn("Cannot parse the value for " + FS_FTP_DATA_CONNECTION_MODE
+            + ": " + mode + ". Using default.");
+      }
+    }
+  }
+
   /**
    * Logout and disconnect the given FTPClient. *
    * 

+ 18 - 0
hadoop-common-project/hadoop-common/src/main/resources/core-default.xml

@@ -763,6 +763,24 @@
   </description>
 </property>
 
+<property>
+  <name>fs.ftp.data.connection.mode</name>
+  <value>ACTIVE_LOCAL_DATA_CONNECTION_MODE</value>
+  <description>Set the FTPClient's data connection mode based on configuration.
+    Valid values are ACTIVE_LOCAL_DATA_CONNECTION_MODE,
+    PASSIVE_LOCAL_DATA_CONNECTION_MODE and PASSIVE_REMOTE_DATA_CONNECTION_MODE.
+  </description>
+</property>
+
+<property>
+  <name>fs.ftp.transfer.mode</name>
+  <value>BLOCK_TRANSFER_MODE</value>
+  <description>
+    Set FTP's transfer mode based on configuration. Valid values are
+    STREAM_TRANSFER_MODE, BLOCK_TRANSFER_MODE and COMPRESSED_TRANSFER_MODE.
+  </description>
+</property>
+
 <property>
   <name>fs.df.interval</name>
   <value>60000</value>

+ 2 - 0
hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/conf/TestCommonConfigurationFields.java

@@ -87,6 +87,8 @@ public class TestCommonConfigurationFields extends TestConfigurationFieldsBase {
     xmlPropsToSkipCompare.add("fs.ftp.password.localhost");
     xmlPropsToSkipCompare.add("fs.ftp.user.localhost");
     xmlPropsToSkipCompare.add("fs.s3.block.size");
+    xmlPropsToSkipCompare.add("fs.ftp.data.connection.mode");
+    xmlPropsToSkipCompare.add("fs.ftp.transfer.mode");
     xmlPropsToSkipCompare.add("hadoop.tmp.dir");
     xmlPropsToSkipCompare.add("nfs3.mountd.port");
     xmlPropsToSkipCompare.add("nfs3.server.port");

+ 54 - 2
hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/ftp/TestFTPFileSystem.java

@@ -19,15 +19,67 @@ package org.apache.hadoop.fs.ftp;
 
 import org.apache.commons.net.ftp.FTP;
 
-import org.junit.Assert;
+import org.apache.commons.net.ftp.FTPClient;
+import org.apache.hadoop.conf.Configuration;
+import org.junit.Rule;
 import org.junit.Test;
+import org.junit.rules.Timeout;
 
+import static org.junit.Assert.assertEquals;
+
+/**
+ * Test basic @{link FTPFileSystem} class methods. Contract tests are in
+ * TestFTPContractXXXX.
+ */
 public class TestFTPFileSystem {
 
+  @Rule
+  public Timeout testTimeout = new Timeout(180000);
+
   @Test
   public void testFTPDefaultPort() throws Exception {
     FTPFileSystem ftp = new FTPFileSystem();
-    Assert.assertEquals(FTP.DEFAULT_PORT, ftp.getDefaultPort());
+    assertEquals(FTP.DEFAULT_PORT, ftp.getDefaultPort());
   }
 
+  @Test
+  public void testFTPTransferMode() throws Exception {
+    Configuration conf = new Configuration();
+    FTPFileSystem ftp = new FTPFileSystem();
+    assertEquals(FTP.BLOCK_TRANSFER_MODE, ftp.getTransferMode(conf));
+
+    conf.set(FTPFileSystem.FS_FTP_TRANSFER_MODE, "STREAM_TRANSFER_MODE");
+    assertEquals(FTP.STREAM_TRANSFER_MODE, ftp.getTransferMode(conf));
+
+    conf.set(FTPFileSystem.FS_FTP_TRANSFER_MODE, "COMPRESSED_TRANSFER_MODE");
+    assertEquals(FTP.COMPRESSED_TRANSFER_MODE, ftp.getTransferMode(conf));
+
+    conf.set(FTPFileSystem.FS_FTP_TRANSFER_MODE, "invalid");
+    assertEquals(FTPClient.BLOCK_TRANSFER_MODE, ftp.getTransferMode(conf));
+  }
+
+  @Test
+  public void testFTPDataConnectionMode() throws Exception {
+    Configuration conf = new Configuration();
+    FTPClient client = new FTPClient();
+    FTPFileSystem ftp = new FTPFileSystem();
+    assertEquals(FTPClient.ACTIVE_LOCAL_DATA_CONNECTION_MODE,
+        client.getDataConnectionMode());
+
+    ftp.setDataConnectionMode(client, conf);
+    assertEquals(FTPClient.ACTIVE_LOCAL_DATA_CONNECTION_MODE,
+        client.getDataConnectionMode());
+
+    conf.set(FTPFileSystem.FS_FTP_DATA_CONNECTION_MODE, "invalid");
+    ftp.setDataConnectionMode(client, conf);
+    assertEquals(FTPClient.ACTIVE_LOCAL_DATA_CONNECTION_MODE,
+        client.getDataConnectionMode());
+
+    conf.set(FTPFileSystem.FS_FTP_DATA_CONNECTION_MODE,
+        "PASSIVE_LOCAL_DATA_CONNECTION_MODE");
+    ftp.setDataConnectionMode(client, conf);
+    assertEquals(FTPClient.PASSIVE_LOCAL_DATA_CONNECTION_MODE,
+        client.getDataConnectionMode());
+
+  }
 }