瀏覽代碼

HDFS-7224. Allow reuse of NN connections via webhdfs. Contributed by Eric Payne

Kihwal Lee 10 年之前
父節點
當前提交
2b0fa20f69

+ 3 - 0
hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt

@@ -546,6 +546,9 @@ Release 2.7.0 - UNRELEASED
     HDFS-7623. Add htrace configuration properties to core-default.xml and
     update user doc about how to enable htrace. (yliu)
 
+    HDFS-7224. Allow reuse of NN connections via webhdfs (Eric Payne via
+    kihwal)
+
   OPTIMIZATIONS
 
     HDFS-7454. Reduce memory footprint for AclEntries in NameNode.

+ 12 - 8
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/web/WebHdfsFileSystem.java

@@ -312,16 +312,20 @@ public class WebHdfsFileSystem extends FileSystem
     if (in == null) {
       throw new IOException("The " + (useErrorStream? "error": "input") + " stream is null.");
     }
-    final String contentType = c.getContentType();
-    if (contentType != null) {
-      final MediaType parsed = MediaType.valueOf(contentType);
-      if (!MediaType.APPLICATION_JSON_TYPE.isCompatible(parsed)) {
-        throw new IOException("Content-Type \"" + contentType
-            + "\" is incompatible with \"" + MediaType.APPLICATION_JSON
-            + "\" (parsed=\"" + parsed + "\")");
+    try {
+      final String contentType = c.getContentType();
+      if (contentType != null) {
+        final MediaType parsed = MediaType.valueOf(contentType);
+        if (!MediaType.APPLICATION_JSON_TYPE.isCompatible(parsed)) {
+          throw new IOException("Content-Type \"" + contentType
+              + "\" is incompatible with \"" + MediaType.APPLICATION_JSON
+              + "\" (parsed=\"" + parsed + "\")");
+        }
       }
+      return (Map<?, ?>)JSON.parse(new InputStreamReader(in, Charsets.UTF_8));
+    } finally {
+      in.close();
     }
-    return (Map<?, ?>)JSON.parse(new InputStreamReader(in, Charsets.UTF_8));
   }
 
   private static Map<?, ?> validateResponse(final HttpOpParam.Op op,

+ 49 - 0
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/web/TestFSMainOperationsWebHdfs.java

@@ -17,8 +17,14 @@
  */
 package org.apache.hadoop.hdfs.web;
 
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.doReturn;
+
 import java.io.IOException;
+import java.io.InputStream;
+import java.net.HttpURLConnection;
 import java.net.URI;
+import java.net.URL;
 import java.security.PrivilegedExceptionAction;
 
 import org.apache.commons.logging.impl.Log4JLogger;
@@ -32,6 +38,8 @@ import org.apache.hadoop.hdfs.DFSConfigKeys;
 import org.apache.hadoop.hdfs.DFSTestUtil;
 import org.apache.hadoop.hdfs.MiniDFSCluster;
 import org.apache.hadoop.hdfs.web.resources.ExceptionHandler;
+import org.apache.hadoop.hdfs.web.resources.GetOpParam;
+import org.apache.hadoop.hdfs.web.resources.HttpOpParam;
 import org.apache.hadoop.security.AccessControlException;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.log4j.Level;
@@ -128,6 +136,47 @@ public class TestFSMainOperationsWebHdfs extends FSMainOperationsBaseTest {
     Assert.assertEquals(1024*4, fileStatus.getLen());
   }
 
+  // Test that WebHdfsFileSystem.jsonParse() closes the connection's input
+  // stream.
+  // Closing the inputstream in jsonParse will allow WebHDFS to reuse
+  // connections to the namenode rather than needing to always open new ones.
+  boolean closedInputStream = false;
+  @Test
+  public void testJsonParseClosesInputStream() throws Exception {
+    final WebHdfsFileSystem webhdfs = (WebHdfsFileSystem)fileSystem;
+    Path file = getTestRootPath(fSys, "test/hadoop/file");
+    createFile(file);
+    final HttpOpParam.Op op = GetOpParam.Op.GETHOMEDIRECTORY;
+    final URL url = webhdfs.toUrl(op, file);
+    final HttpURLConnection conn = (HttpURLConnection) url.openConnection();
+    conn.setRequestMethod(op.getType().toString());
+    conn.connect();
+
+    InputStream myIn = new InputStream(){
+      private HttpURLConnection localConn = conn;
+      @Override
+      public void close() throws IOException {
+        closedInputStream = true;
+        localConn.getInputStream().close();
+      }
+      @Override
+      public int read() throws IOException {
+        return localConn.getInputStream().read();
+      }
+    };
+    final HttpURLConnection spyConn = spy(conn);
+    doReturn(myIn).when(spyConn).getInputStream();
+
+    try {
+      Assert.assertFalse(closedInputStream);
+      WebHdfsFileSystem.jsonParse(spyConn, false);
+      Assert.assertTrue(closedInputStream);
+    } catch(IOException ioe) {
+      junit.framework.TestCase.fail();
+    }
+    conn.disconnect();
+  }
+
   @Override
   @Test
   public void testMkdirsFailsForSubdirectoryOfExistingFile() throws Exception {