Browse Source

svn merge -c 1455883 from branch-1 for HDFS-4597. Backport WebHDFS concat.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-1.2@1455884 13f79535-47bb-0310-9956-ffa450edef68
Tsz-wo Sze 12 years ago
parent
commit
2e4236bd30

+ 2 - 0
CHANGES.txt

@@ -61,6 +61,8 @@ Release 1.2.0 - unreleased
     HDFS-4256 Backport concatenation of files into a single file to branch-1
     HDFS-4256 Backport concatenation of files into a single file to branch-1
     (sanjay Radia)
     (sanjay Radia)
 
 
+    HDFS-4597. Backport WebHDFS concat.  (szetszwo)
+
   IMPROVEMENTS
   IMPROVEMENTS
 
 
     HDFS-3515. Port HDFS-1457 to branch-1. (eli)
     HDFS-3515. Port HDFS-1457 to branch-1. (eli)

+ 11 - 0
src/core/org/apache/hadoop/fs/FileSystem.java

@@ -669,6 +669,17 @@ public abstract class FileSystem extends Configured implements Closeable {
    */
    */
   public abstract FSDataOutputStream append(Path f, int bufferSize,
   public abstract FSDataOutputStream append(Path f, int bufferSize,
       Progressable progress) throws IOException;
       Progressable progress) throws IOException;
+
+  /**
+   * Concat existing files together.
+   * @param trg the path to the target destination.
+   * @param psrcs the paths to the sources to use for the concatenation.
+   * @throws IOException
+   */
+  public void concat(final Path trg, final Path [] srcs) throws IOException {
+    throw new UnsupportedOperationException("Not implemented by the " + 
+        getClass().getSimpleName() + " FileSystem implementation");
+  }
   
   
   /**
   /**
    * Get replication.
    * Get replication.

+ 5 - 1
src/core/org/apache/hadoop/fs/FilterFileSystem.java

@@ -114,7 +114,11 @@ public class FilterFileSystem extends FileSystem {
     return fs.append(f, bufferSize, progress);
     return fs.append(f, bufferSize, progress);
   }
   }
 
 
-  /** {@inheritDoc} */
+  @Override
+  public void concat(Path f, Path[] psrcs) throws IOException {
+    fs.concat(f, psrcs);
+  }
+
   @Override
   @Override
   public FSDataOutputStream create(Path f, FsPermission permission,
   public FSDataOutputStream create(Path f, FsPermission permission,
       boolean overwrite, int bufferSize, short replication, long blockSize,
       boolean overwrite, int bufferSize, short replication, long blockSize,

+ 1 - 0
src/docs/src/documentation/content/xdocs/site.xml

@@ -166,6 +166,7 @@ See http://forrest.apache.org/docs/linking.html for more info.
                 <setTimes href="#setTimes(org.apache.hadoop.fs.Path,%20long,%20long)" />
                 <setTimes href="#setTimes(org.apache.hadoop.fs.Path,%20long,%20long)" />
 
 
                 <append href="#append(org.apache.hadoop.fs.Path,%20int,%20org.apache.hadoop.util.Progressable)" />
                 <append href="#append(org.apache.hadoop.fs.Path,%20int,%20org.apache.hadoop.util.Progressable)" />
+                <concat href="#concat(org.apache.hadoop.fs.Path,%20org.apache.hadoop.fs.Path[])" />
                 <delete href="#delete(org.apache.hadoop.fs.Path,%20boolean)" />
                 <delete href="#delete(org.apache.hadoop.fs.Path,%20boolean)" />
               </filesystem>
               </filesystem>
             </fs>
             </fs>

+ 41 - 0
src/docs/src/documentation/content/xdocs/webhdfs.xml

@@ -104,6 +104,9 @@
     <li><a href="#APPEND"><code>APPEND</code></a>
     <li><a href="#APPEND"><code>APPEND</code></a>
         (see <a href="ext:api/org/apache/hadoop/fs/filesystem/append">FileSystem.append</a>)
         (see <a href="ext:api/org/apache/hadoop/fs/filesystem/append">FileSystem.append</a>)
     </li>
     </li>
+    <li><a href="#CONCAT"><code>CONCAT</code></a>
+        (see <a href="ext:api/org/apache/hadoop/fs/filesystem/concat">FileSystem.concat</a>)
+    </li>
   </ul></li>
   </ul></li>
   <li>HTTP DELETE
   <li>HTTP DELETE
   <ul>
   <ul>
@@ -310,6 +313,28 @@ Content-Length: 0
   <a href="ext:api/org/apache/hadoop/fs/filesystem/append">FileSystem.append</a>
   <a href="ext:api/org/apache/hadoop/fs/filesystem/append">FileSystem.append</a>
 </p>
 </p>
       </section>
       </section>
+<!-- ***************************************************************************** -->
+      <section id="CONCAT">
+        <title>Concatenate Files</title>
+<ul>
+  <li>Submit a HTTP POST request.
+    <source>
+        curl -i -X POST "http://&lt;HOST&gt;:&lt;PORT&gt;/webhdfs/v1/&lt;PATH&gt;?op=CONCAT&amp;sources=&lt;PATHS&gt;"
+    </source>
+The client receives a response with zero content length:
+    <source>
+HTTP/1.1 200 OK
+Content-Length: 0
+    </source>
+  </li>
+</ul>
+<p>
+  See also:
+  <a href="#sources"><code>sources</code></a>,
+  <a href="ext:api/org/apache/hadoop/fs/filesystem/concat">FileSystem.concat</a>
+</p>
+      </section>
+
 <!-- ***************************************************************************** -->
 <!-- ***************************************************************************** -->
       <section id="OPEN">
       <section id="OPEN">
         <title>Open and Read a File</title>
         <title>Open and Read a File</title>
@@ -1534,6 +1559,22 @@ var fileStatusProperties =
   <a href="#SETREPLICATION"><code>SETREPLICATION</code></a>
   <a href="#SETREPLICATION"><code>SETREPLICATION</code></a>
 </p>
 </p>
       </section>
       </section>
+<!-- ***************************************************************************** -->
+      <section id="sources">
+        <title>Sources</title>
+<table>
+  <tr><td>Name</td><td><code>sources</code></td></tr>
+  <tr><td>Description</td><td>A list of source paths.</td></tr>
+  <tr><td>Type</td><td>String</td></tr>
+  <tr><td>Default Value</td><td>&lt;empty&gt;</td></tr>
+  <tr><td>Valid Values</td><td>A list of comma seperated absolute FileSystem paths without scheme and authority.</td></tr>
+  <tr><td>Syntax</td><td>Any string.</td></tr>
+</table>
+<p>
+  See also:
+  <a href="#CONCAT"><code>CONCAT</code></a>,
+</p>
+      </section>
 <!-- ***************************************************************************** -->
 <!-- ***************************************************************************** -->
       <section id="token">
       <section id="token">
         <title>Token</title>
         <title>Token</title>

+ 4 - 4
src/hdfs/org/apache/hadoop/hdfs/DistributedFileSystem.java

@@ -220,14 +220,14 @@ public class DistributedFileSystem extends FileSystem {
   }
   }
 
 
   /**
   /**
-   * THIS IS DFS only operations, it is not part of FileSystem
-   * move blocks from srcs to trg
-   * and delete srcs afterwards
-   * all blocks should be the same size
+   * Move blocks from srcs to trg and delete srcs afterwards.
+   * The file block sizes must be the same.
+   *
    * @param trg existing file to append to
    * @param trg existing file to append to
    * @param psrcs list of files (same block size, same replication)
    * @param psrcs list of files (same block size, same replication)
    * @throws IOException
    * @throws IOException
    */
    */
+  @Override
   public void concat(Path trg, Path [] psrcs) throws IOException {
   public void concat(Path trg, Path [] psrcs) throws IOException {
     String [] srcs = new String [psrcs.length];
     String [] srcs = new String [psrcs.length];
     for(int i=0; i<psrcs.length; i++) {
     for(int i=0; i<psrcs.length; i++) {

+ 12 - 2
src/hdfs/org/apache/hadoop/hdfs/server/namenode/web/resources/NamenodeWebHdfsMethods.java

@@ -63,6 +63,7 @@ import org.apache.hadoop.hdfs.web.WebHdfsFileSystem;
 import org.apache.hadoop.hdfs.web.resources.AccessTimeParam;
 import org.apache.hadoop.hdfs.web.resources.AccessTimeParam;
 import org.apache.hadoop.hdfs.web.resources.BlockSizeParam;
 import org.apache.hadoop.hdfs.web.resources.BlockSizeParam;
 import org.apache.hadoop.hdfs.web.resources.BufferSizeParam;
 import org.apache.hadoop.hdfs.web.resources.BufferSizeParam;
+import org.apache.hadoop.hdfs.web.resources.ConcatSourcesParam;
 import org.apache.hadoop.hdfs.web.resources.DelegationParam;
 import org.apache.hadoop.hdfs.web.resources.DelegationParam;
 import org.apache.hadoop.hdfs.web.resources.DeleteOpParam;
 import org.apache.hadoop.hdfs.web.resources.DeleteOpParam;
 import org.apache.hadoop.hdfs.web.resources.DestinationParam;
 import org.apache.hadoop.hdfs.web.resources.DestinationParam;
@@ -401,10 +402,12 @@ public class NamenodeWebHdfsMethods {
           final DoAsParam doAsUser,
           final DoAsParam doAsUser,
       @QueryParam(PostOpParam.NAME) @DefaultValue(PostOpParam.DEFAULT)
       @QueryParam(PostOpParam.NAME) @DefaultValue(PostOpParam.DEFAULT)
           final PostOpParam op,
           final PostOpParam op,
+      @QueryParam(ConcatSourcesParam.NAME) @DefaultValue(ConcatSourcesParam.DEFAULT)
+          final ConcatSourcesParam concatSrcs,
       @QueryParam(BufferSizeParam.NAME) @DefaultValue(BufferSizeParam.DEFAULT)
       @QueryParam(BufferSizeParam.NAME) @DefaultValue(BufferSizeParam.DEFAULT)
           final BufferSizeParam bufferSize
           final BufferSizeParam bufferSize
       ) throws IOException, InterruptedException {
       ) throws IOException, InterruptedException {
-    return post(ugi, delegation, username, doAsUser, ROOT, op, bufferSize);
+    return post(ugi, delegation, username, doAsUser, ROOT, op, concatSrcs, bufferSize);
   }
   }
 
 
   /** Handle HTTP POST request. */
   /** Handle HTTP POST request. */
@@ -423,11 +426,13 @@ public class NamenodeWebHdfsMethods {
       @PathParam(UriFsPathParam.NAME) final UriFsPathParam path,
       @PathParam(UriFsPathParam.NAME) final UriFsPathParam path,
       @QueryParam(PostOpParam.NAME) @DefaultValue(PostOpParam.DEFAULT)
       @QueryParam(PostOpParam.NAME) @DefaultValue(PostOpParam.DEFAULT)
           final PostOpParam op,
           final PostOpParam op,
+      @QueryParam(ConcatSourcesParam.NAME) @DefaultValue(ConcatSourcesParam.DEFAULT)
+          final ConcatSourcesParam concatSrcs,
       @QueryParam(BufferSizeParam.NAME) @DefaultValue(BufferSizeParam.DEFAULT)
       @QueryParam(BufferSizeParam.NAME) @DefaultValue(BufferSizeParam.DEFAULT)
           final BufferSizeParam bufferSize
           final BufferSizeParam bufferSize
       ) throws IOException, InterruptedException {
       ) throws IOException, InterruptedException {
 
 
-    init(ugi, delegation, username, doAsUser, path, op, bufferSize);
+    init(ugi, delegation, username, doAsUser, path, op, concatSrcs, bufferSize);
 
 
     return ugi.doAs(new PrivilegedExceptionAction<Response>() {
     return ugi.doAs(new PrivilegedExceptionAction<Response>() {
       @Override
       @Override
@@ -445,6 +450,11 @@ public class NamenodeWebHdfsMethods {
           fullpath, op.getValue(), -1L, -1L, bufferSize);
           fullpath, op.getValue(), -1L, -1L, bufferSize);
       return Response.temporaryRedirect(uri).type(MediaType.APPLICATION_OCTET_STREAM).build();
       return Response.temporaryRedirect(uri).type(MediaType.APPLICATION_OCTET_STREAM).build();
     }
     }
+    case CONCAT:
+    {
+      namenode.concat(fullpath, concatSrcs.getAbsolutePaths());
+      return Response.ok().build();
+    }
     default:
     default:
       throw new UnsupportedOperationException(op + " is not supported");
       throw new UnsupportedOperationException(op + " is not supported");
     }
     }

+ 10 - 0
src/hdfs/org/apache/hadoop/hdfs/web/WebHdfsFileSystem.java

@@ -61,6 +61,7 @@ import org.apache.hadoop.hdfs.server.namenode.SafeModeException;
 import org.apache.hadoop.hdfs.web.resources.AccessTimeParam;
 import org.apache.hadoop.hdfs.web.resources.AccessTimeParam;
 import org.apache.hadoop.hdfs.web.resources.BlockSizeParam;
 import org.apache.hadoop.hdfs.web.resources.BlockSizeParam;
 import org.apache.hadoop.hdfs.web.resources.BufferSizeParam;
 import org.apache.hadoop.hdfs.web.resources.BufferSizeParam;
+import org.apache.hadoop.hdfs.web.resources.ConcatSourcesParam;
 import org.apache.hadoop.hdfs.web.resources.DeleteOpParam;
 import org.apache.hadoop.hdfs.web.resources.DeleteOpParam;
 import org.apache.hadoop.hdfs.web.resources.DestinationParam;
 import org.apache.hadoop.hdfs.web.resources.DestinationParam;
 import org.apache.hadoop.hdfs.web.resources.GetOpParam;
 import org.apache.hadoop.hdfs.web.resources.GetOpParam;
@@ -672,6 +673,15 @@ public class WebHdfsFileSystem extends FileSystem
     };
     };
   }
   }
 
 
+  @Override
+  public void concat(final Path trg, final Path [] srcs) throws IOException {
+    statistics.incrementWriteOps(1);
+    final HttpOpParam.Op op = PostOpParam.Op.CONCAT;
+
+    ConcatSourcesParam param = new ConcatSourcesParam(srcs);
+    run(op, trg, param);
+  }
+
   @Override
   @Override
   public FSDataOutputStream create(final Path f, final FsPermission permission,
   public FSDataOutputStream create(final Path f, final FsPermission permission,
       final boolean overwrite, final int bufferSize, final short replication,
       final boolean overwrite, final int bufferSize, final short replication,

+ 65 - 0
src/hdfs/org/apache/hadoop/hdfs/web/resources/ConcatSourcesParam.java

@@ -0,0 +1,65 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hdfs.web.resources;
+
+import org.apache.hadoop.fs.Path;
+
+/** The concat source paths parameter. */
+public class ConcatSourcesParam extends StringParam {
+  /** Parameter name. */
+  public static final String NAME = "sources";
+
+  public static final String DEFAULT = "";
+
+  private static final Domain DOMAIN = new Domain(NAME, null);
+
+  private static String paths2String(Path[] paths) {
+    if (paths == null || paths.length == 0) {
+      return "";
+    }
+    final StringBuilder b = new StringBuilder(paths[0].toUri().getPath());
+    for(int i = 1; i < paths.length; i++) {
+      b.append(',').append(paths[i].toUri().getPath());
+    }
+    return b.toString();
+  }
+
+  /**
+   * Constructor.
+   * @param str a string representation of the parameter value.
+   */
+  public ConcatSourcesParam(String str) {
+    super(DOMAIN, str);
+  }
+
+  public ConcatSourcesParam(Path[] paths) {
+    this(paths2String(paths));
+  }
+
+  @Override
+  public String getName() {
+    return NAME;
+  }
+
+  /** @return the absolute path. */
+  public final String[] getAbsolutePaths() {
+    final String[] paths = getValue().split(",");
+    return paths;
+  }
+}

+ 9 - 5
src/hdfs/org/apache/hadoop/hdfs/web/resources/PostOpParam.java

@@ -23,13 +23,17 @@ import java.net.HttpURLConnection;
 public class PostOpParam extends HttpOpParam<PostOpParam.Op> {
 public class PostOpParam extends HttpOpParam<PostOpParam.Op> {
   /** Post operations. */
   /** Post operations. */
   public static enum Op implements HttpOpParam.Op {
   public static enum Op implements HttpOpParam.Op {
-    APPEND(HttpURLConnection.HTTP_OK),
+    APPEND(true, HttpURLConnection.HTTP_OK),
 
 
-    NULL(HttpURLConnection.HTTP_NOT_IMPLEMENTED);
+    CONCAT(false, HttpURLConnection.HTTP_OK),
 
 
+    NULL(false, HttpURLConnection.HTTP_NOT_IMPLEMENTED);
+
+    final boolean doOutputAndRedirect;
     final int expectedHttpResponseCode;
     final int expectedHttpResponseCode;
 
 
-    Op(final int expectedHttpResponseCode) {
+    Op(final boolean doOutputAndRedirect, final int expectedHttpResponseCode) {
+      this.doOutputAndRedirect = doOutputAndRedirect;
       this.expectedHttpResponseCode = expectedHttpResponseCode;
       this.expectedHttpResponseCode = expectedHttpResponseCode;
     }
     }
 
 
@@ -40,12 +44,12 @@ public class PostOpParam extends HttpOpParam<PostOpParam.Op> {
 
 
     @Override
     @Override
     public boolean getDoOutput() {
     public boolean getDoOutput() {
-      return true;
+      return doOutputAndRedirect;
     }
     }
 
 
     @Override
     @Override
     public boolean getRedirect() {
     public boolean getRedirect() {
-      return true;
+      return doOutputAndRedirect;
     }
     }
 
 
     @Override
     @Override

+ 27 - 0
src/test/org/apache/hadoop/hdfs/web/TestFSMainOperationsWebHdfs.java

@@ -28,10 +28,12 @@ import java.security.PrivilegedExceptionAction;
 import org.apache.commons.logging.impl.Log4JLogger;
 import org.apache.commons.logging.impl.Log4JLogger;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FSMainOperationsBaseTest;
 import org.apache.hadoop.fs.FSMainOperationsBaseTest;
+import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.permission.FsPermission;
 import org.apache.hadoop.fs.permission.FsPermission;
 import org.apache.hadoop.hdfs.DFSConfigKeys;
 import org.apache.hadoop.hdfs.DFSConfigKeys;
+import org.apache.hadoop.hdfs.DFSTestUtil;
 import org.apache.hadoop.hdfs.MiniDFSCluster;
 import org.apache.hadoop.hdfs.MiniDFSCluster;
 import org.apache.hadoop.hdfs.server.datanode.web.resources.DatanodeWebHdfsMethods;
 import org.apache.hadoop.hdfs.server.datanode.web.resources.DatanodeWebHdfsMethods;
 import org.apache.hadoop.hdfs.web.resources.ExceptionHandler;
 import org.apache.hadoop.hdfs.web.resources.ExceptionHandler;
@@ -56,6 +58,7 @@ public class TestFSMainOperationsWebHdfs extends FSMainOperationsBaseTest {
   public static void setupCluster() {
   public static void setupCluster() {
     final Configuration conf = new Configuration();
     final Configuration conf = new Configuration();
     conf.setBoolean(DFSConfigKeys.DFS_WEBHDFS_ENABLED_KEY, true);
     conf.setBoolean(DFSConfigKeys.DFS_WEBHDFS_ENABLED_KEY, true);
+    conf.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, 1024);
     try {
     try {
       cluster = new MiniDFSCluster(conf, 2, true, null);
       cluster = new MiniDFSCluster(conf, 2, true, null);
       cluster.waitActive();
       cluster.waitActive();
@@ -97,6 +100,30 @@ public class TestFSMainOperationsWebHdfs extends FSMainOperationsBaseTest {
     return defaultWorkingDirectory;
     return defaultWorkingDirectory;
   }
   }
 
 
+  @Test
+  public void testConcat() throws Exception {
+    Path[] paths = {new Path("/test/hadoop/file1"),
+                    new Path("/test/hadoop/file2"),
+                    new Path("/test/hadoop/file3")};
+
+    DFSTestUtil.createFile(fSys, paths[0], 1024, (short) 3, 0);
+    DFSTestUtil.createFile(fSys, paths[1], 1024, (short) 3, 0);
+    DFSTestUtil.createFile(fSys, paths[2], 1024, (short) 3, 0);
+
+    Path catPath = new Path("/test/hadoop/catFile");
+    DFSTestUtil.createFile(fSys, catPath, 1024, (short) 3, 0);
+    Assert.assertTrue(exists(fSys, catPath));
+
+    fSys.concat(catPath, paths);
+
+    Assert.assertFalse(exists(fSys, paths[0]));
+    Assert.assertFalse(exists(fSys, paths[1]));
+    Assert.assertFalse(exists(fSys, paths[2]));
+
+    FileStatus fileStatus = fSys.getFileStatus(catPath);
+    Assert.assertEquals(1024*4, fileStatus.getLen());
+  }
+
   //copied from trunk.
   //copied from trunk.
   @Test
   @Test
   public void testMkdirsFailsForSubdirectoryOfExistingFile() throws Exception {
   public void testMkdirsFailsForSubdirectoryOfExistingFile() throws Exception {