Ver Fonte

HDFS-3577. In DatanodeWebHdfsMethods, use MessageBodyWriter instead of StreamingOutput, otherwise, it will fail to transfer large files. Contributed by Tsz Wo (Nicholas), SZE


git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-2.1.0-alpha@1363601 13f79535-47bb-0310-9956-ffa450edef68
Eli Collins há 12 anos atrás
pai
commit
b2246fbe54

+ 4 - 0
hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt

@@ -1245,6 +1245,10 @@ Release 0.23.3 - UNRELEASED
     HDFS-3037. TestMulitipleNNDataBlockScanner#testBlockScannerAfterRestart is
     racy. (atm)
 
+    HDFS-3577. In DatanodeWebHdfsMethods, use MessageBodyWriter instead of
+    StreamingOutput, otherwise, it will fail to transfer large files.
+    (szetszwo)
+
 Release 0.23.2 - UNRELEASED
 
   INCOMPATIBLE CHANGES

+ 4 - 27
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/web/resources/DatanodeWebHdfsMethods.java

@@ -19,7 +19,6 @@ package org.apache.hadoop.hdfs.server.datanode.web.resources;
 
 import java.io.IOException;
 import java.io.InputStream;
-import java.io.OutputStream;
 import java.net.InetSocketAddress;
 import java.net.URI;
 import java.net.URISyntaxException;
@@ -40,7 +39,6 @@ import javax.ws.rs.QueryParam;
 import javax.ws.rs.core.Context;
 import javax.ws.rs.core.MediaType;
 import javax.ws.rs.core.Response;
-import javax.ws.rs.core.StreamingOutput;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -411,31 +409,10 @@ public class DatanodeWebHdfsMethods {
         IOUtils.cleanup(LOG, dfsclient);
         throw ioe;
       }
-      final HdfsDataInputStream dis = in;
-      final StreamingOutput streaming = new StreamingOutput() {
-        @Override
-        public void write(final OutputStream out) throws IOException {
-          final Long n = length.getValue();
-          HdfsDataInputStream dfsin = dis;
-          DFSClient client = dfsclient;
-          try {
-            if (n == null) {
-              IOUtils.copyBytes(dfsin, out, b);
-            } else {
-              IOUtils.copyBytes(dfsin, out, n, false);
-            }
-            dfsin.close();
-            dfsin = null;
-            dfsclient.close();
-            client = null;
-          } finally {
-            IOUtils.cleanup(LOG, dfsin);
-            IOUtils.cleanup(LOG, client);
-          }
-        }
-      };
-
-      return Response.ok(streaming).type(
+      
+      final long n = length.getValue() != null? length.getValue()
+          : in.getVisibleLength();
+      return Response.ok(new OpenEntity(in, n, dfsclient)).type(
           MediaType.APPLICATION_OCTET_STREAM).build();
     }
     case GETFILECHECKSUM:

+ 81 - 0
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/web/resources/OpenEntity.java

@@ -0,0 +1,81 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.datanode.web.resources;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.lang.annotation.Annotation;
+import java.lang.reflect.Type;
+
+import javax.ws.rs.core.MediaType;
+import javax.ws.rs.core.MultivaluedMap;
+import javax.ws.rs.ext.MessageBodyWriter;
+import javax.ws.rs.ext.Provider;
+
+import org.apache.hadoop.hdfs.DFSClient;
+import org.apache.hadoop.hdfs.client.HdfsDataInputStream;
+import org.apache.hadoop.io.IOUtils;
+
+/**
+ * A response entity for a HdfsDataInputStream.
+ */
+public class OpenEntity {
+  private final HdfsDataInputStream in;
+  private final long length;
+  private final DFSClient dfsclient;
+  
+  OpenEntity(final HdfsDataInputStream in, final long length,
+      final DFSClient dfsclient) {
+    this.in = in;
+    this.length = length;
+    this.dfsclient = dfsclient;
+  }
+  
+  /**
+   * A {@link MessageBodyWriter} for {@link OpenEntity}.
+   */
+  @Provider
+  public static class Writer implements MessageBodyWriter<OpenEntity> {
+
+    @Override
+    public boolean isWriteable(Class<?> clazz, Type genericType,
+        Annotation[] annotations, MediaType mediaType) {
+      return clazz == OpenEntity.class
+          && MediaType.APPLICATION_OCTET_STREAM_TYPE.isCompatible(mediaType);
+    }
+
+    @Override
+    public long getSize(OpenEntity e, Class<?> type, Type genericType,
+        Annotation[] annotations, MediaType mediaType) {
+      return e.length;
+    }
+
+    @Override
+    public void writeTo(OpenEntity e, Class<?> type, Type genericType,
+        Annotation[] annotations, MediaType mediaType,
+        MultivaluedMap<String, Object> httpHeaders, OutputStream out
+        ) throws IOException {
+      try {
+        IOUtils.copyBytes(e.in, out, e.length, false);
+      } finally {
+        IOUtils.cleanup(DatanodeWebHdfsMethods.LOG, e.in);
+        IOUtils.cleanup(DatanodeWebHdfsMethods.LOG, e.dfsclient);
+      }
+    }
+  }
+}

+ 12 - 6
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/web/TestWebHdfsFileSystemContract.java

@@ -26,6 +26,7 @@ import java.net.HttpURLConnection;
 import java.net.URL;
 import java.util.Arrays;
 import java.util.Map;
+import java.util.Random;
 
 import javax.servlet.http.HttpServletResponse;
 import javax.ws.rs.core.MediaType;
@@ -205,15 +206,20 @@ public class TestWebHdfsFileSystemContract extends FileSystemContractBaseTest {
       assertEquals(0, count);
     }
 
+    final byte[] mydata = new byte[1 << 20];
+    new Random().nextBytes(mydata);
+
     final Path p = new Path(dir, "file");
-    createFile(p);
+    FSDataOutputStream out = fs.create(p, false, 4096, (short)3, 1L << 17);
+    out.write(mydata, 0, mydata.length);
+    out.close();
 
-    final int one_third = data.length/3;
+    final int one_third = mydata.length/3;
     final int two_third = one_third*2;
 
     { //test seek
       final int offset = one_third; 
-      final int len = data.length - offset;
+      final int len = mydata.length - offset;
       final byte[] buf = new byte[len];
 
       final FSDataInputStream in = fs.open(p);
@@ -225,13 +231,13 @@ public class TestWebHdfsFileSystemContract extends FileSystemContractBaseTest {
   
       for (int i = 0; i < buf.length; i++) {
         assertEquals("Position " + i + ", offset=" + offset + ", length=" + len,
-            data[i + offset], buf[i]);
+            mydata[i + offset], buf[i]);
       }
     }
 
     { //test position read (read the data after the two_third location)
       final int offset = two_third; 
-      final int len = data.length - offset;
+      final int len = mydata.length - offset;
       final byte[] buf = new byte[len];
 
       final FSDataInputStream in = fs.open(p);
@@ -240,7 +246,7 @@ public class TestWebHdfsFileSystemContract extends FileSystemContractBaseTest {
   
       for (int i = 0; i < buf.length; i++) {
         assertEquals("Position " + i + ", offset=" + offset + ", length=" + len,
-            data[i + offset], buf[i]);
+            mydata[i + offset], buf[i]);
       }
     }
   }