Browse Source

HDFS-7945. The WebHdfs system on DN does not honor the length parameter. Contributed by Haohui Mai.

Haohui Mai 10 years ago
parent
commit
8c40e88d5d

+ 3 - 0
hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt

@@ -1194,6 +1194,9 @@ Release 2.7.0 - UNRELEASED
     default dfs.journalnode.http-address port 8480 is in use. (Xiaoyu Yao via
     default dfs.journalnode.http-address port 8480 is in use. (Xiaoyu Yao via
     Arpit Agarwal)
     Arpit Agarwal)
 
 
+    HDFS-7945. The WebHdfs system on DN does not honor the length parameter.
+    (wheat9)
+
     BREAKDOWN OF HDFS-7584 SUBTASKS AND RELATED JIRAS
     BREAKDOWN OF HDFS-7584 SUBTASKS AND RELATED JIRAS
 
 
       HDFS-7720. Quota by Storage Type API, tools and ClientNameNode
       HDFS-7720. Quota by Storage Type API, tools and ClientNameNode

+ 5 - 0
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/web/webhdfs/ParameterParser.java

@@ -27,6 +27,7 @@ import org.apache.hadoop.hdfs.web.resources.BufferSizeParam;
 import org.apache.hadoop.hdfs.web.resources.DelegationParam;
 import org.apache.hadoop.hdfs.web.resources.DelegationParam;
 import org.apache.hadoop.hdfs.web.resources.DoAsParam;
 import org.apache.hadoop.hdfs.web.resources.DoAsParam;
 import org.apache.hadoop.hdfs.web.resources.HttpOpParam;
 import org.apache.hadoop.hdfs.web.resources.HttpOpParam;
+import org.apache.hadoop.hdfs.web.resources.LengthParam;
 import org.apache.hadoop.hdfs.web.resources.NamenodeAddressParam;
 import org.apache.hadoop.hdfs.web.resources.NamenodeAddressParam;
 import org.apache.hadoop.hdfs.web.resources.OffsetParam;
 import org.apache.hadoop.hdfs.web.resources.OffsetParam;
 import org.apache.hadoop.hdfs.web.resources.OverwriteParam;
 import org.apache.hadoop.hdfs.web.resources.OverwriteParam;
@@ -65,6 +66,10 @@ class ParameterParser {
     return new OffsetParam(param(OffsetParam.NAME)).getOffset();
     return new OffsetParam(param(OffsetParam.NAME)).getOffset();
   }
   }
 
 
+  long length() {
+    return new LengthParam(param(LengthParam.NAME)).getLength();
+  }
+
   String namenodeId() {
   String namenodeId() {
     return new NamenodeAddressParam(param(NamenodeAddressParam.NAME))
     return new NamenodeAddressParam(param(NamenodeAddressParam.NAME))
       .getValue();
       .getValue();

+ 14 - 3
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/web/webhdfs/WebHdfsHandler.java

@@ -47,8 +47,10 @@ import org.apache.hadoop.hdfs.web.resources.PutOpParam;
 import org.apache.hadoop.io.IOUtils;
 import org.apache.hadoop.io.IOUtils;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.security.token.Token;
 import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.util.LimitInputStream;
 
 
 import java.io.IOException;
 import java.io.IOException;
+import java.io.InputStream;
 import java.io.OutputStream;
 import java.io.OutputStream;
 import java.net.URI;
 import java.net.URI;
 import java.net.URISyntaxException;
 import java.net.URISyntaxException;
@@ -188,6 +190,7 @@ public class WebHdfsHandler extends SimpleChannelInboundHandler<HttpRequest> {
     final String nnId = params.namenodeId();
     final String nnId = params.namenodeId();
     final int bufferSize = params.bufferSize();
     final int bufferSize = params.bufferSize();
     final long offset = params.offset();
     final long offset = params.offset();
+    final long length = params.length();
 
 
     DefaultHttpResponse response = new DefaultHttpResponse(HTTP_1_1, OK);
     DefaultHttpResponse response = new DefaultHttpResponse(HTTP_1_1, OK);
     HttpHeaders headers = response.headers();
     HttpHeaders headers = response.headers();
@@ -202,12 +205,20 @@ public class WebHdfsHandler extends SimpleChannelInboundHandler<HttpRequest> {
       dfsclient.open(path, bufferSize, true));
       dfsclient.open(path, bufferSize, true));
     in.seek(offset);
     in.seek(offset);
 
 
-    if (in.getVisibleLength() >= offset) {
-      headers.set(CONTENT_LENGTH, in.getVisibleLength() - offset);
+    long contentLength = in.getVisibleLength() - offset;
+    if (length >= 0) {
+      contentLength = Math.min(contentLength, length);
+    }
+    final InputStream data;
+    if (contentLength >= 0) {
+      headers.set(CONTENT_LENGTH, contentLength);
+      data = new LimitInputStream(in, contentLength);
+    } else {
+      data = in;
     }
     }
 
 
     ctx.write(response);
     ctx.write(response);
-    ctx.writeAndFlush(new ChunkedStream(in) {
+    ctx.writeAndFlush(new ChunkedStream(data) {
       @Override
       @Override
       public void close() throws Exception {
       public void close() throws Exception {
         super.close();
         super.close();

+ 5 - 0
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/web/resources/LengthParam.java

@@ -46,4 +46,9 @@ public class LengthParam extends LongParam {
   public String getName() {
   public String getName() {
     return NAME;
     return NAME;
   }
   }
+
+  public long getLength() {
+    Long v = getValue();
+    return v == null ? -1 : v;
+  }
 }
 }

+ 45 - 0
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/web/TestWebHDFS.java

@@ -21,10 +21,15 @@ package org.apache.hadoop.hdfs.web;
 import static org.junit.Assert.fail;
 import static org.junit.Assert.fail;
 
 
 import java.io.IOException;
 import java.io.IOException;
+import java.io.OutputStream;
+import java.net.HttpURLConnection;
+import java.net.InetSocketAddress;
 import java.net.URISyntaxException;
 import java.net.URISyntaxException;
+import java.net.URL;
 import java.security.PrivilegedExceptionAction;
 import java.security.PrivilegedExceptionAction;
 import java.util.Random;
 import java.util.Random;
 
 
+import org.apache.commons.io.IOUtils;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.commons.logging.LogFactory;
 import org.apache.commons.logging.impl.Log4JLogger;
 import org.apache.commons.logging.impl.Log4JLogger;
@@ -45,6 +50,9 @@ import org.apache.hadoop.hdfs.server.namenode.NameNode;
 import org.apache.hadoop.hdfs.server.namenode.snapshot.SnapshotTestHelper;
 import org.apache.hadoop.hdfs.server.namenode.snapshot.SnapshotTestHelper;
 import org.apache.hadoop.hdfs.server.namenode.web.resources.NamenodeWebHdfsMethods;
 import org.apache.hadoop.hdfs.server.namenode.web.resources.NamenodeWebHdfsMethods;
 import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocols;
 import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocols;
+import org.apache.hadoop.hdfs.web.resources.LengthParam;
+import org.apache.hadoop.hdfs.web.resources.OffsetParam;
+import org.apache.hadoop.hdfs.web.resources.Param;
 import org.apache.hadoop.ipc.RetriableException;
 import org.apache.hadoop.ipc.RetriableException;
 import org.apache.hadoop.security.AccessControlException;
 import org.apache.hadoop.security.AccessControlException;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.security.UserGroupInformation;
@@ -523,4 +531,41 @@ public class TestWebHDFS {
       }
       }
     }
     }
   }
   }
+
+  @Test
+  public void testWebHdfsOffsetAndLength() throws Exception{
+    MiniDFSCluster cluster = null;
+    final Configuration conf = WebHdfsTestUtil.createConf();
+    final int OFFSET = 42;
+    final int LENGTH = 512;
+    final String PATH = "/foo";
+    byte[] CONTENTS = new byte[1024];
+    RANDOM.nextBytes(CONTENTS);
+    try {
+      cluster = new MiniDFSCluster.Builder(conf).numDataNodes(1).build();
+      final WebHdfsFileSystem fs =
+          WebHdfsTestUtil.getWebHdfsFileSystem(conf, WebHdfsFileSystem.SCHEME);
+      try (OutputStream os = fs.create(new Path(PATH))) {
+        os.write(CONTENTS);
+      }
+      InetSocketAddress addr = cluster.getNameNode().getHttpAddress();
+      URL url = new URL("http", addr.getHostString(), addr
+          .getPort(), WebHdfsFileSystem.PATH_PREFIX + PATH + "?op=OPEN" +
+          Param.toSortedString("&", new OffsetParam((long) OFFSET),
+                               new LengthParam((long) LENGTH))
+      );
+      HttpURLConnection conn = (HttpURLConnection) url.openConnection();
+      conn.setInstanceFollowRedirects(true);
+      Assert.assertEquals(LENGTH, conn.getContentLength());
+      byte[] subContents = new byte[LENGTH];
+      byte[] realContents = new byte[LENGTH];
+      System.arraycopy(CONTENTS, OFFSET, subContents, 0, LENGTH);
+      IOUtils.readFully(conn.getInputStream(), realContents);
+      Assert.assertArrayEquals(subContents, realContents);
+    } finally {
+      if (cluster != null) {
+        cluster.shutdown();
+      }
+    }
+  }
 }
 }