浏览代码

HADOOP-15358. SFTPConnectionPool connections leakage. Contributed by Mikhail Pryakhin.

Márton Elek 6 年之前
父节点
当前提交
6934a65402

+ 38 - 6
hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/sftp/SFTPFileSystem.java

@@ -35,6 +35,7 @@ import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.permission.FsPermission;
 import org.apache.hadoop.util.Progressable;
 
+import com.google.common.annotations.VisibleForTesting;
 import com.jcraft.jsch.ChannelSftp;
 import com.jcraft.jsch.ChannelSftp.LsEntry;
 import com.jcraft.jsch.SftpATTRS;
@@ -219,7 +220,7 @@ public class SFTPFileSystem extends FileSystem {
       Path root = new Path("/");
       return new FileStatus(length, isDir, blockReplication, blockSize,
           modTime,
-          root.makeQualified(this.getUri(), this.getWorkingDirectory()));
+          root.makeQualified(this.getUri(), this.getWorkingDirectory(client)));
     }
     String pathName = parentPath.toUri().getPath();
     Vector<LsEntry> sftpFiles;
@@ -289,7 +290,7 @@ public class SFTPFileSystem extends FileSystem {
 
     return new FileStatus(length, isDir, blockReplication, blockSize, modTime,
         accessTime, permission, user, group, filePath.makeQualified(
-            this.getUri(), this.getWorkingDirectory()));
+            this.getUri(), this.getWorkingDirectory(channel)));
   }
 
   /**
@@ -524,10 +525,13 @@ public class SFTPFileSystem extends FileSystem {
     } catch (SftpException e) {
       throw new IOException(e);
     }
-
-    FSDataInputStream fis =
-        new FSDataInputStream(new SFTPInputStream(is, channel, statistics));
-    return fis;
+    return new FSDataInputStream(new SFTPInputStream(is, statistics)){
+      @Override
+      public void close() throws IOException {
+        super.close();
+        disconnect(channel);
+      }
+    };
   }
 
   /**
@@ -636,6 +640,16 @@ public class SFTPFileSystem extends FileSystem {
     return getHomeDirectory();
   }
 
+  /**
+   * Convenience method, so that we don't open a new connection when using this
+   * method from within another method. Otherwise every API invocation incurs
+   * the overhead of opening/closing a TCP connection.
+   */
+  private Path getWorkingDirectory(ChannelSftp client) {
+    // Return home directory always since we do not maintain state.
+    return getHomeDirectory(client);
+  }
+
   @Override
   public Path getHomeDirectory() {
     ChannelSftp channel = null;
@@ -654,6 +668,19 @@ public class SFTPFileSystem extends FileSystem {
     }
   }
 
+  /**
+   * Convenience method, so that we don't open a new connection when using this
+   * method from within another method. Otherwise every API invocation incurs
+   * the overhead of opening/closing a TCP connection.
+   */
+  private Path getHomeDirectory(ChannelSftp channel) {
+    try {
+      return new Path(channel.pwd());
+    } catch (Exception ioe) {
+      return null;
+    }
+  }
+
   @Override
   public boolean mkdirs(Path f, FsPermission permission) throws IOException {
     ChannelSftp client = connect();
@@ -675,4 +702,9 @@ public class SFTPFileSystem extends FileSystem {
       disconnect(channel);
     }
   }
+
+  @VisibleForTesting
+  SFTPConnectionPool getConnectionPool() {
+    return connectionPool;
+  }
 }

+ 2 - 26
hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/sftp/SFTPInputStream.java

@@ -22,39 +22,25 @@ import java.io.InputStream;
 
 import org.apache.hadoop.fs.FSInputStream;
 import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.util.StringUtils;
-
-import com.jcraft.jsch.ChannelSftp;
-import com.jcraft.jsch.JSchException;
-import com.jcraft.jsch.Session;
 
 /** SFTP FileSystem input stream. */
 class SFTPInputStream extends FSInputStream {
 
   public static final String E_SEEK_NOTSUPPORTED = "Seek not supported";
-  public static final String E_CLIENT_NULL =
-      "SFTP client null or not connected";
   public static final String E_NULL_INPUTSTREAM = "Null InputStream";
   public static final String E_STREAM_CLOSED = "Stream closed";
-  public static final String E_CLIENT_NOTCONNECTED = "Client not connected";
 
   private InputStream wrappedStream;
-  private ChannelSftp channel;
   private FileSystem.Statistics stats;
   private boolean closed;
   private long pos;
 
-  SFTPInputStream(InputStream stream, ChannelSftp channel,
-      FileSystem.Statistics stats) {
+  SFTPInputStream(InputStream stream,  FileSystem.Statistics stats) {
 
     if (stream == null) {
       throw new IllegalArgumentException(E_NULL_INPUTSTREAM);
     }
-    if (channel == null || !channel.isConnected()) {
-      throw new IllegalArgumentException(E_CLIENT_NULL);
-    }
     this.wrappedStream = stream;
-    this.channel = channel;
     this.stats = stats;
 
     this.pos = 0;
@@ -114,17 +100,7 @@ class SFTPInputStream extends FSInputStream {
       return;
     }
     super.close();
+    wrappedStream.close();
     closed = true;
-    if (!channel.isConnected()) {
-      throw new IOException(E_CLIENT_NOTCONNECTED);
-    }
-
-    try {
-      Session session = channel.getSession();
-      channel.disconnect();
-      session.disconnect();
-    } catch (JSchException e) {
-      throw new IOException(StringUtils.stringifyException(e));
-    }
   }
 }

+ 59 - 17
hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/sftp/TestSFTPFileSystem.java

@@ -34,25 +34,31 @@ import org.apache.hadoop.fs.LocalFileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.test.GenericTestUtils;
 
-import org.apache.sshd.server.SshServer;
+import static org.apache.hadoop.test.PlatformAssumptions.assumeNotWindows;
 import org.apache.sshd.common.NamedFactory;
 import org.apache.sshd.server.Command;
-import org.apache.sshd.server.auth.password.PasswordAuthenticator;
+import org.apache.sshd.server.SshServer;
 import org.apache.sshd.server.auth.UserAuth;
+import org.apache.sshd.server.auth.password.PasswordAuthenticator;
 import org.apache.sshd.server.auth.password.UserAuthPasswordFactory;
 import org.apache.sshd.server.keyprovider.SimpleGeneratorHostKeyProvider;
 import org.apache.sshd.server.session.ServerSession;
-
 import org.apache.sshd.server.subsystem.sftp.SftpSubsystemFactory;
+import static org.hamcrest.core.Is.is;
+import org.junit.After;
 import org.junit.AfterClass;
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertThat;
+import static org.junit.Assert.assertTrue;
+import org.junit.Before;
 import org.junit.BeforeClass;
 import org.junit.Rule;
 import org.junit.Test;
 import org.junit.rules.TestName;
 
-import static org.apache.hadoop.test.PlatformAssumptions.assumeNotWindows;
-import static org.junit.Assert.*;
-
 public class TestSFTPFileSystem {
 
   private static final String TEST_SFTP_DIR = "testsftp";
@@ -64,8 +70,9 @@ public class TestSFTPFileSystem {
   private static final String connection = "sftp://user:password@localhost";
   private static Path localDir = null;
   private static FileSystem localFs = null;
-  private static FileSystem sftpFs = null;
+  private FileSystem sftpFs = null;
   private static SshServer sshd = null;
+  private static Configuration conf = null;
   private static int port;
 
   private static void startSshdServer() throws IOException {
@@ -98,6 +105,22 @@ public class TestSFTPFileSystem {
     port = sshd.getPort();
   }
 
+  @Before
+  public void init() throws Exception {
+    sftpFs = FileSystem.get(URI.create(connection), conf);
+  }
+
+  @After
+  public void cleanUp() throws Exception {
+    if (sftpFs != null) {
+      try {
+        sftpFs.close();
+      } catch (IOException e) {
+        // ignore
+      }
+    }
+  }
+
   @BeforeClass
   public static void setUp() throws Exception {
     // skip all tests if running on Windows
@@ -105,7 +128,7 @@ public class TestSFTPFileSystem {
 
     startSshdServer();
 
-    Configuration conf = new Configuration();
+    conf = new Configuration();
     conf.setClass("fs.sftp.impl", SFTPFileSystem.class, FileSystem.class);
     conf.setInt("fs.sftp.host.port", port);
     conf.setBoolean("fs.sftp.impl.disable.cache", true);
@@ -116,8 +139,6 @@ public class TestSFTPFileSystem {
       localFs.delete(localDir, true);
     }
     localFs.mkdirs(localDir);
-
-    sftpFs = FileSystem.get(URI.create(connection), conf);
   }
 
   @AfterClass
@@ -130,13 +151,6 @@ public class TestSFTPFileSystem {
         // ignore
       }
     }
-    if (sftpFs != null) {
-      try {
-        sftpFs.close();
-      } catch (IOException e) {
-        // ignore
-      }
-    }
     if (sshd != null) {
       try {
         sshd.stop(true);
@@ -179,6 +193,8 @@ public class TestSFTPFileSystem {
     assertTrue(localFs.exists(file));
     assertTrue(sftpFs.delete(file, false));
     assertFalse(localFs.exists(file));
+    assertThat(((SFTPFileSystem) sftpFs).getConnectionPool().getLiveConnCount(),
+        is(1));
   }
 
   /**
@@ -194,6 +210,8 @@ public class TestSFTPFileSystem {
     assertTrue(sftpFs.delete(file, false));
     assertFalse(sftpFs.exists(file));
     assertFalse(localFs.exists(file));
+    assertThat(((SFTPFileSystem) sftpFs).getConnectionPool().getLiveConnCount(),
+        is(1));
   }
 
   /**
@@ -217,6 +235,8 @@ public class TestSFTPFileSystem {
       }
     }
     assertTrue(sftpFs.delete(file, false));
+    assertThat(((SFTPFileSystem) sftpFs).getConnectionPool().getLiveConnCount(),
+        is(1));
   }
 
   /**
@@ -238,6 +258,8 @@ public class TestSFTPFileSystem {
     assertEquals(data.length, sstat.getLen());
     assertEquals(lstat.getLen(), sstat.getLen());
     assertTrue(sftpFs.delete(file, false));
+    assertThat(((SFTPFileSystem) sftpFs).getConnectionPool().getLiveConnCount(),
+        is(1));
   }
 
   /**
@@ -249,6 +271,8 @@ public class TestSFTPFileSystem {
   public void testDeleteNonEmptyDir() throws Exception {
     Path file = touch(localFs, name.getMethodName().toLowerCase());
     sftpFs.delete(localDir, false);
+    assertThat(((SFTPFileSystem) sftpFs).getConnectionPool().getLiveConnCount(),
+        is(1));
   }
 
   /**
@@ -260,6 +284,8 @@ public class TestSFTPFileSystem {
   public void testDeleteNonExistFile() throws Exception {
     Path file = new Path(localDir, name.getMethodName().toLowerCase());
     assertFalse(sftpFs.delete(file, false));
+    assertThat(((SFTPFileSystem) sftpFs).getConnectionPool().getLiveConnCount(),
+        is(1));
   }
 
   /**
@@ -282,6 +308,8 @@ public class TestSFTPFileSystem {
     assertFalse(localFs.exists(file1));
 
     assertTrue(sftpFs.delete(file2, false));
+    assertThat(((SFTPFileSystem) sftpFs).getConnectionPool().getLiveConnCount(),
+        is(1));
   }
 
   /**
@@ -319,6 +347,8 @@ public class TestSFTPFileSystem {
     accessTime1 = (accessTime1 / 1000) * 1000;
     long accessTime2 = sftpFs.getFileStatus(file).getAccessTime();
     assertEquals(accessTime1, accessTime2);
+    assertThat(((SFTPFileSystem) sftpFs).getConnectionPool().getLiveConnCount(),
+        is(1));
   }
 
   @Test
@@ -330,6 +360,18 @@ public class TestSFTPFileSystem {
     modifyTime1 = (modifyTime1 / 1000) * 1000;
     long modifyTime2 = sftpFs.getFileStatus(file).getModificationTime();
     assertEquals(modifyTime1, modifyTime2);
+    assertThat(((SFTPFileSystem) sftpFs).getConnectionPool().getLiveConnCount(),
+        is(1));
   }
 
+  @Test
+  public void testMkDirs() throws IOException {
+    Path path = new Path(localDir.toUri().getPath(),
+        new Path(name.getMethodName(), "subdirectory"));
+    sftpFs.mkdirs(path);
+    assertTrue(localFs.exists(path));
+    assertTrue(localFs.getFileStatus(path).isDirectory());
+    assertThat(((SFTPFileSystem) sftpFs).getConnectionPool().getLiveConnCount(),
+        is(1));
+  }
 }