瀏覽代碼

HADOOP-17528. SFTP File System: close the connection pool when closing a FileSystem (#2701)

Contributed by Mike Pryakhin.
Mike 4 年之前
父節點
當前提交
7b7c0019f4

+ 30 - 1
hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/sftp/SFTPFileSystem.java

@@ -24,6 +24,7 @@ import java.net.URI;
 import java.net.URLDecoder;
 import java.util.ArrayList;
 import java.util.Vector;
+import java.util.concurrent.atomic.AtomicBoolean;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FSDataInputStream;
@@ -50,6 +51,7 @@ public class SFTPFileSystem extends FileSystem {
 
   private SFTPConnectionPool connectionPool;
   private URI uri;
+  private final AtomicBoolean closed = new AtomicBoolean(false);
 
   private static final int DEFAULT_SFTP_PORT = 22;
   private static final int DEFAULT_MAX_CONNECTION = 5;
@@ -83,6 +85,7 @@ public class SFTPFileSystem extends FileSystem {
       "Destination path %s already exist, cannot rename!";
   public static final String E_FAILED_GETHOME = "Failed to get home directory";
   public static final String E_FAILED_DISCONNECT = "Failed to disconnect";
+  public static final String E_FS_CLOSED = "FileSystem is closed!";
 
   /**
    * Set configuration from UI.
@@ -138,8 +141,9 @@ public class SFTPFileSystem extends FileSystem {
    * @throws IOException
    */
   private ChannelSftp connect() throws IOException {
-    Configuration conf = getConf();
+    checkNotClosed();
 
+    Configuration conf = getConf();
     String host = conf.get(FS_SFTP_HOST, null);
     int port = conf.getInt(FS_SFTP_HOST_PORT, DEFAULT_SFTP_PORT);
     String user = conf.get(FS_SFTP_USER_PREFIX + host, null);
@@ -703,6 +707,31 @@ public class SFTPFileSystem extends FileSystem {
     }
   }
 
+  @Override
+  public void close() throws IOException {
+    if (closed.getAndSet(true)) {
+      return;
+    }
+    try {
+      super.close();
+    } finally {
+      if (connectionPool != null) {
+        connectionPool.shutdown();
+      }
+    }
+  }
+
+  /**
+   * Verify that the input stream is open. Non blocking; this gives
+   * the last state of the volatile {@link #closed} field.
+   * @throws IOException if the connection is closed.
+   */
+  private void checkNotClosed() throws IOException {
+    if (closed.get()) {
+      throw new IOException(uri + ": " + E_FS_CLOSED);
+    }
+  }
+
   @VisibleForTesting
   SFTPConnectionPool getConnectionPool() {
     return connectionPool;

+ 11 - 0
hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/sftp/TestSFTPFileSystem.java

@@ -374,4 +374,15 @@ public class TestSFTPFileSystem {
     assertThat(((SFTPFileSystem) sftpFs).getConnectionPool().getLiveConnCount(),
         is(1));
   }
+
+  @Test
+  public void testCloseFileSystemClosesConnectionPool() throws Exception {
+    SFTPFileSystem fs = (SFTPFileSystem) sftpFs;
+    fs.getHomeDirectory();
+    assertThat(fs.getConnectionPool().getLiveConnCount(), is(1));
+    fs.close();
+    assertThat(fs.getConnectionPool().getLiveConnCount(), is(0));
+    ///making sure that re-entrant close calls are safe
+    fs.close();
+  }
 }