浏览代码

svn merge -c 1589528 merging from trunk to branch-2 to fix:HDFS-6217. Webhdfs PUT operations may not work via a http proxy. Contributed by Daryn Sharp.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-2@1589529 13f79535-47bb-0310-9956-ffa450edef68
Kihwal Lee 11 年之前
父节点
当前提交
8bcb8dba51

+ 3 - 0
hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt

@@ -127,6 +127,9 @@ Release 2.5.0 - UNRELEASED
     HDFS-6275. Fix warnings - type arguments can be inferred and redudant
     local variable. (suresh)
 
+    HDFS-6217. Webhdfs PUT operations may not work via a http proxy.
+    (Daryn Sharp via kihwal)
+
 Release 2.4.1 - UNRELEASED
 
   INCOMPATIBLE CHANGES

+ 18 - 1
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/web/WebHdfsFileSystem.java

@@ -490,8 +490,25 @@ public class WebHdfsFileSystem extends FileSystem
 
     private void connect(boolean doOutput) throws IOException {
       conn.setRequestMethod(op.getType().toString());
-      conn.setDoOutput(doOutput);
       conn.setInstanceFollowRedirects(false);
+      switch (op.getType()) {
+        // if not sending a message body for a POST or PUT operation, need
+        // to ensure the server/proxy knows this 
+        case POST:
+        case PUT: {
+          conn.setDoOutput(true);
+          if (!doOutput) {
+            // explicitly setting content-length to 0 won't do spnego!!
+            // opening and closing the stream will send "Content-Length: 0"
+            conn.getOutputStream().close();
+          }
+          break;
+        }
+        default: {
+          conn.setDoOutput(doOutput);
+          break;
+        }
+      }
       conn.connect();
     }
 

+ 197 - 0
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/web/TestWebHdfsContentLength.java

@@ -0,0 +1,197 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hdfs.web;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.net.ServerSocket;
+import java.net.Socket;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.net.NetUtils;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+
+public class TestWebHdfsContentLength {
+  private static ServerSocket listenSocket;
+  private static String bindAddr;
+  private static Path p;
+  private static FileSystem fs;
+
+  private static final Pattern contentLengthPattern = Pattern.compile(
+      "^(Content-Length|Transfer-Encoding):\\s*(.*)", Pattern.MULTILINE);
+
+  private static String errResponse =
+      "HTTP/1.1 500 Boom\r\n" +
+      "Content-Length: 0\r\n" +
+      "Connection: close\r\n\r\n";
+  private static String redirectResponse;
+
+  private static ExecutorService executor;
+
+  @BeforeClass
+  public static void setup() throws IOException {
+    listenSocket = new ServerSocket();
+    listenSocket.bind(null);
+    bindAddr = NetUtils.getHostPortString(
+        (InetSocketAddress)listenSocket.getLocalSocketAddress());
+    redirectResponse =
+        "HTTP/1.1 307 Redirect\r\n" +
+        "Location: http://"+bindAddr+"/path\r\n" +
+        "Connection: close\r\n\r\n";
+
+    p = new Path("webhdfs://"+bindAddr+"/path");
+    fs = p.getFileSystem(new Configuration());
+    executor = Executors.newSingleThreadExecutor();    
+  }
+  
+  @AfterClass
+  public static void teardown() throws IOException {
+    if (listenSocket != null) {
+      listenSocket.close();
+    }
+    if (executor != null) {
+      executor.shutdownNow();
+    }
+  }
+  
+  @Test
+  public void testGetOp() throws Exception {
+    Future<String> future = contentLengthFuture(errResponse);
+    try {
+      fs.getFileStatus(p);
+      Assert.fail();
+    } catch (IOException ioe) {} // expected
+    Assert.assertEquals(null, getContentLength(future));
+  }
+
+  @Test
+  public void testGetOpWithRedirect() {
+    Future<String> future1 = contentLengthFuture(redirectResponse);
+    Future<String> future2 = contentLengthFuture(errResponse);
+    try {
+      fs.open(p).read();
+      Assert.fail();
+    } catch (IOException ioe) {} // expected
+    Assert.assertEquals(null, getContentLength(future1));
+    Assert.assertEquals(null, getContentLength(future2));
+  }
+  
+  @Test
+  public void testPutOp() {
+    Future<String> future = contentLengthFuture(errResponse);
+    try {
+      fs.mkdirs(p);
+      Assert.fail();
+    } catch (IOException ioe) {} // expected
+    Assert.assertEquals("0", getContentLength(future));
+  }
+
+  @Test
+  public void testPutOpWithRedirect() {
+    Future<String> future1 = contentLengthFuture(redirectResponse);
+    Future<String> future2 = contentLengthFuture(errResponse);
+    try {
+      FSDataOutputStream os = fs.create(p);
+      os.write(new byte[]{0});
+      os.close();
+      Assert.fail();
+    } catch (IOException ioe) {} // expected
+    Assert.assertEquals("0", getContentLength(future1));
+    Assert.assertEquals("chunked", getContentLength(future2));
+  }
+  
+  @Test
+  public void testPostOp() {  
+    Future<String> future = contentLengthFuture(errResponse);
+    try {
+      fs.concat(p, new Path[]{p});
+      Assert.fail();
+    } catch (IOException ioe) {} // expected
+    Assert.assertEquals("0", getContentLength(future));
+  }
+  
+  @Test
+  public void testPostOpWithRedirect() {
+    // POST operation with redirect
+    Future<String> future1 = contentLengthFuture(redirectResponse);
+    Future<String> future2 = contentLengthFuture(errResponse);
+    try {
+      FSDataOutputStream os = fs.append(p);
+      os.write(new byte[]{0});
+      os.close();
+      Assert.fail();
+    } catch (IOException ioe) {} // expected
+    Assert.assertEquals("0", getContentLength(future1));
+    Assert.assertEquals("chunked", getContentLength(future2));
+  }
+  
+  @Test
+  public void testDelete() {
+    Future<String> future = contentLengthFuture(errResponse);
+    try {
+      fs.delete(p, false);
+      Assert.fail();
+    } catch (IOException ioe) {} // expected
+    Assert.assertEquals(null, getContentLength(future));
+  }  
+
+  private String getContentLength(Future<String> future)  {
+    String request = null;
+    try {
+      request = future.get(2, TimeUnit.SECONDS);
+    } catch (Exception e) {
+      Assert.fail(e.toString());
+    }
+    Matcher matcher = contentLengthPattern.matcher(request);
+    return matcher.find() ? matcher.group(2) : null;
+  }
+  
+  private Future<String> contentLengthFuture(final String response) {
+    return executor.submit(new Callable<String>() {
+      @Override
+      public String call() throws Exception {
+        Socket client = listenSocket.accept();
+        client.setSoTimeout(2000);
+        try {
+          client.getOutputStream().write(response.getBytes());
+          client.shutdownOutput();
+          byte[] buf = new byte[4*1024]; // much bigger than request
+          int n = client.getInputStream().read(buf);
+          return new String(buf, 0, n);
+        } finally {
+          client.close();
+        }
+      }
+    });
+  }
+}