Ver código fonte

HDFS-3598. WebHDFS support for file concat. Contributed by Plamen Jeliazkov.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1440290 13f79535-47bb-0310-9956-ffa450edef68
Konstantin Shvachko 12 anos atrás
pai
commit
481b6cccf0

+ 11 - 0
hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FileSystem.java

@@ -1128,6 +1128,17 @@ public abstract class FileSystem extends Configured implements Closeable {
   public abstract FSDataOutputStream append(Path f, int bufferSize,
   public abstract FSDataOutputStream append(Path f, int bufferSize,
       Progressable progress) throws IOException;
       Progressable progress) throws IOException;
 
 
+  /**
+   * Concat existing files together.
+   * @param trg the path to the target destination.
+   * @param psrcs the paths to the sources to use for the concatenation.
+   * @throws IOException
+   */
+  public void concat(final Path trg, final Path [] psrcs) throws IOException {
+    throw new UnsupportedOperationException("Not implemented by the " + 
+        getClass().getSimpleName() + " FileSystem implementation");
+  }
+
  /**
  /**
    * Get replication.
    * Get replication.
    * 
    * 

+ 5 - 0
hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FilterFileSystem.java

@@ -159,6 +159,11 @@ public class FilterFileSystem extends FileSystem {
     return fs.append(f, bufferSize, progress);
     return fs.append(f, bufferSize, progress);
   }
   }
 
 
+  @Override
+  public void concat(Path f, Path[] psrcs) throws IOException {
+    fs.concat(f, psrcs);
+  }
+
   @Override
   @Override
   public FSDataOutputStream create(Path f, FsPermission permission,
   public FSDataOutputStream create(Path f, FsPermission permission,
       boolean overwrite, int bufferSize, short replication, long blockSize,
       boolean overwrite, int bufferSize, short replication, long blockSize,

+ 2 - 0
hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt

@@ -499,6 +499,8 @@ Release 2.0.3-alpha - Unreleased
 
 
     HDFS-4259. Improve pipeline DN replacement failure message (harsh)
     HDFS-4259. Improve pipeline DN replacement failure message (harsh)
 
 
+    HDFS-3598. WebHDFS support for file concat. (Plamen Jeliazkov via shv)
+
   OPTIMIZATIONS
   OPTIMIZATIONS
 
 
     HDFS-3429. DataNode reads checksums even if client does not need them (todd)
     HDFS-3429. DataNode reads checksums even if client does not need them (todd)

+ 2 - 3
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DistributedFileSystem.java

@@ -310,10 +310,9 @@ public class DistributedFileSystem extends FileSystem {
   }
   }
   
   
   /**
   /**
-   * THIS IS DFS only operations, it is not part of FileSystem
-   * move blocks from srcs to trg
+   * Move blocks from srcs to trg
    * and delete srcs afterwards
    * and delete srcs afterwards
-   * all blocks should be the same size
+   * RESTRICTION: all blocks should be the same size
    * @param trg existing file to append to
    * @param trg existing file to append to
    * @param psrcs list of files (same block size, same replication)
    * @param psrcs list of files (same block size, same replication)
    * @throws IOException
    * @throws IOException

+ 14 - 3
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/web/resources/NamenodeWebHdfsMethods.java

@@ -70,6 +70,7 @@ import org.apache.hadoop.hdfs.web.WebHdfsFileSystem;
 import org.apache.hadoop.hdfs.web.resources.AccessTimeParam;
 import org.apache.hadoop.hdfs.web.resources.AccessTimeParam;
 import org.apache.hadoop.hdfs.web.resources.BlockSizeParam;
 import org.apache.hadoop.hdfs.web.resources.BlockSizeParam;
 import org.apache.hadoop.hdfs.web.resources.BufferSizeParam;
 import org.apache.hadoop.hdfs.web.resources.BufferSizeParam;
+import org.apache.hadoop.hdfs.web.resources.ConcatSourcesParam;
 import org.apache.hadoop.hdfs.web.resources.CreateParentParam;
 import org.apache.hadoop.hdfs.web.resources.CreateParentParam;
 import org.apache.hadoop.hdfs.web.resources.DelegationParam;
 import org.apache.hadoop.hdfs.web.resources.DelegationParam;
 import org.apache.hadoop.hdfs.web.resources.DeleteOpParam;
 import org.apache.hadoop.hdfs.web.resources.DeleteOpParam;
@@ -483,10 +484,12 @@ public class NamenodeWebHdfsMethods {
           final DoAsParam doAsUser,
           final DoAsParam doAsUser,
       @QueryParam(PostOpParam.NAME) @DefaultValue(PostOpParam.DEFAULT)
       @QueryParam(PostOpParam.NAME) @DefaultValue(PostOpParam.DEFAULT)
           final PostOpParam op,
           final PostOpParam op,
+      @QueryParam(ConcatSourcesParam.NAME) @DefaultValue(ConcatSourcesParam.DEFAULT)
+          final ConcatSourcesParam concatSrcs,
       @QueryParam(BufferSizeParam.NAME) @DefaultValue(BufferSizeParam.DEFAULT)
       @QueryParam(BufferSizeParam.NAME) @DefaultValue(BufferSizeParam.DEFAULT)
           final BufferSizeParam bufferSize
           final BufferSizeParam bufferSize
       ) throws IOException, InterruptedException {
       ) throws IOException, InterruptedException {
-    return post(ugi, delegation, username, doAsUser, ROOT, op, bufferSize);
+    return post(ugi, delegation, username, doAsUser, ROOT, op, concatSrcs, bufferSize);
   }
   }
 
 
   /** Handle HTTP POST request. */
   /** Handle HTTP POST request. */
@@ -505,11 +508,13 @@ public class NamenodeWebHdfsMethods {
       @PathParam(UriFsPathParam.NAME) final UriFsPathParam path,
       @PathParam(UriFsPathParam.NAME) final UriFsPathParam path,
       @QueryParam(PostOpParam.NAME) @DefaultValue(PostOpParam.DEFAULT)
       @QueryParam(PostOpParam.NAME) @DefaultValue(PostOpParam.DEFAULT)
           final PostOpParam op,
           final PostOpParam op,
+      @QueryParam(ConcatSourcesParam.NAME) @DefaultValue(ConcatSourcesParam.DEFAULT)
+          final ConcatSourcesParam concatSrcs,
       @QueryParam(BufferSizeParam.NAME) @DefaultValue(BufferSizeParam.DEFAULT)
       @QueryParam(BufferSizeParam.NAME) @DefaultValue(BufferSizeParam.DEFAULT)
           final BufferSizeParam bufferSize
           final BufferSizeParam bufferSize
       ) throws IOException, InterruptedException {
       ) throws IOException, InterruptedException {
 
 
-    init(ugi, delegation, username, doAsUser, path, op, bufferSize);
+    init(ugi, delegation, username, doAsUser, path, op, concatSrcs, bufferSize);
 
 
     return ugi.doAs(new PrivilegedExceptionAction<Response>() {
     return ugi.doAs(new PrivilegedExceptionAction<Response>() {
       @Override
       @Override
@@ -517,7 +522,7 @@ public class NamenodeWebHdfsMethods {
         REMOTE_ADDRESS.set(request.getRemoteAddr());
         REMOTE_ADDRESS.set(request.getRemoteAddr());
         try {
         try {
           return post(ugi, delegation, username, doAsUser,
           return post(ugi, delegation, username, doAsUser,
-              path.getAbsolutePath(), op, bufferSize);
+              path.getAbsolutePath(), op, concatSrcs, bufferSize);
         } finally {
         } finally {
           REMOTE_ADDRESS.set(null);
           REMOTE_ADDRESS.set(null);
         }
         }
@@ -532,6 +537,7 @@ public class NamenodeWebHdfsMethods {
       final DoAsParam doAsUser,
       final DoAsParam doAsUser,
       final String fullpath,
       final String fullpath,
       final PostOpParam op,
       final PostOpParam op,
+      final ConcatSourcesParam concatSrcs,
       final BufferSizeParam bufferSize
       final BufferSizeParam bufferSize
       ) throws IOException, URISyntaxException {
       ) throws IOException, URISyntaxException {
     final NameNode namenode = (NameNode)context.getAttribute("name.node");
     final NameNode namenode = (NameNode)context.getAttribute("name.node");
@@ -543,6 +549,11 @@ public class NamenodeWebHdfsMethods {
           fullpath, op.getValue(), -1L, -1L, bufferSize);
           fullpath, op.getValue(), -1L, -1L, bufferSize);
       return Response.temporaryRedirect(uri).type(MediaType.APPLICATION_OCTET_STREAM).build();
       return Response.temporaryRedirect(uri).type(MediaType.APPLICATION_OCTET_STREAM).build();
     }
     }
+    case CONCAT:
+    {
+      namenode.getRpcServer().concat(fullpath, concatSrcs.getAbsolutePaths());
+      return Response.ok().build();
+    }
     default:
     default:
       throw new UnsupportedOperationException(op + " is not supported");
       throw new UnsupportedOperationException(op + " is not supported");
     }
     }

+ 20 - 0
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/web/WebHdfsFileSystem.java

@@ -29,7 +29,9 @@ import java.net.MalformedURLException;
 import java.net.URI;
 import java.net.URI;
 import java.net.URISyntaxException;
 import java.net.URISyntaxException;
 import java.net.URL;
 import java.net.URL;
+import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collection;
+import java.util.List;
 import java.util.Map;
 import java.util.Map;
 import java.util.StringTokenizer;
 import java.util.StringTokenizer;
 
 
@@ -65,6 +67,7 @@ import org.apache.hadoop.hdfs.server.namenode.SafeModeException;
 import org.apache.hadoop.hdfs.web.resources.AccessTimeParam;
 import org.apache.hadoop.hdfs.web.resources.AccessTimeParam;
 import org.apache.hadoop.hdfs.web.resources.BlockSizeParam;
 import org.apache.hadoop.hdfs.web.resources.BlockSizeParam;
 import org.apache.hadoop.hdfs.web.resources.BufferSizeParam;
 import org.apache.hadoop.hdfs.web.resources.BufferSizeParam;
+import org.apache.hadoop.hdfs.web.resources.ConcatSourcesParam;
 import org.apache.hadoop.hdfs.web.resources.CreateParentParam;
 import org.apache.hadoop.hdfs.web.resources.CreateParentParam;
 import org.apache.hadoop.hdfs.web.resources.DeleteOpParam;
 import org.apache.hadoop.hdfs.web.resources.DeleteOpParam;
 import org.apache.hadoop.hdfs.web.resources.DestinationParam;
 import org.apache.hadoop.hdfs.web.resources.DestinationParam;
@@ -103,6 +106,7 @@ import org.apache.hadoop.security.token.TokenIdentifier;
 import org.apache.hadoop.security.token.TokenRenewer;
 import org.apache.hadoop.security.token.TokenRenewer;
 import org.apache.hadoop.security.token.delegation.AbstractDelegationTokenSelector;
 import org.apache.hadoop.security.token.delegation.AbstractDelegationTokenSelector;
 import org.apache.hadoop.util.Progressable;
 import org.apache.hadoop.util.Progressable;
+import org.apache.hadoop.util.StringUtils;
 import org.mortbay.util.ajax.JSON;
 import org.mortbay.util.ajax.JSON;
 
 
 import com.google.common.base.Charsets;
 import com.google.common.base.Charsets;
@@ -716,6 +720,22 @@ public class WebHdfsFileSystem extends FileSystem
     };
     };
   }
   }
 
 
+  @Override
+  public void concat(final Path trg, final Path [] psrcs) throws IOException {
+    statistics.incrementWriteOps(1);
+    final HttpOpParam.Op op = PostOpParam.Op.CONCAT;
+
+    List<String> strPaths = new ArrayList<String>(psrcs.length);
+    for(Path psrc : psrcs) {
+       strPaths.add(psrc.toUri().getPath());
+    }
+
+    String srcs = StringUtils.join(",", strPaths);
+
+    ConcatSourcesParam param = new ConcatSourcesParam(srcs);
+    run(op, trg, param);
+  }
+
   @Override
   @Override
   public FSDataOutputStream create(final Path f, final FsPermission permission,
   public FSDataOutputStream create(final Path f, final FsPermission permission,
       final boolean overwrite, final int bufferSize, final short replication,
       final boolean overwrite, final int bufferSize, final short replication,

+ 48 - 0
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/web/resources/ConcatSourcesParam.java

@@ -0,0 +1,48 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hdfs.web.resources;
+
+/** The concat source paths parameter. */
+public class ConcatSourcesParam extends StringParam {
+  /** Parameter name. */
+  public static final String NAME = "srcs";
+
+  public static final String DEFAULT = NULL;
+
+  private static final Domain DOMAIN = new Domain(NAME, null);
+
+  /**
+   * Constructor.
+   * @param str a string representation of the parameter value.
+   */
+  public ConcatSourcesParam(String str) {
+    super(DOMAIN, str);
+  }
+
+  @Override
+  public String getName() {
+    return NAME;
+  }
+
+  /** @return the absolute path. */
+  public final String[] getAbsolutePaths() {
+    final String[] paths = getValue().split(",");
+    return paths;
+  }
+}

+ 9 - 5
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/web/resources/PostOpParam.java

@@ -23,13 +23,17 @@ import java.net.HttpURLConnection;
 public class PostOpParam extends HttpOpParam<PostOpParam.Op> {
 public class PostOpParam extends HttpOpParam<PostOpParam.Op> {
   /** Post operations. */
   /** Post operations. */
   public static enum Op implements HttpOpParam.Op {
   public static enum Op implements HttpOpParam.Op {
-    APPEND(HttpURLConnection.HTTP_OK),
+    APPEND(true, HttpURLConnection.HTTP_OK),
 
 
-    NULL(HttpURLConnection.HTTP_NOT_IMPLEMENTED);
+    CONCAT(false, HttpURLConnection.HTTP_OK),
 
 
+    NULL(false, HttpURLConnection.HTTP_NOT_IMPLEMENTED);
+
+    final boolean doOutputAndRedirect;
     final int expectedHttpResponseCode;
     final int expectedHttpResponseCode;
 
 
-    Op(final int expectedHttpResponseCode) {
+    Op(final boolean doOutputAndRedirect, final int expectedHttpResponseCode) {
+      this.doOutputAndRedirect = doOutputAndRedirect;
       this.expectedHttpResponseCode = expectedHttpResponseCode;
       this.expectedHttpResponseCode = expectedHttpResponseCode;
     }
     }
 
 
@@ -40,12 +44,12 @@ public class PostOpParam extends HttpOpParam<PostOpParam.Op> {
 
 
     @Override
     @Override
     public boolean getDoOutput() {
     public boolean getDoOutput() {
-      return true;
+      return doOutputAndRedirect;
     }
     }
 
 
     @Override
     @Override
     public boolean getRedirect() {
     public boolean getRedirect() {
-      return true;
+      return doOutputAndRedirect;
     }
     }
 
 
     @Override
     @Override

+ 27 - 0
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/web/TestFSMainOperationsWebHdfs.java

@@ -27,11 +27,13 @@ import java.security.PrivilegedExceptionAction;
 import org.apache.commons.logging.impl.Log4JLogger;
 import org.apache.commons.logging.impl.Log4JLogger;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FSMainOperationsBaseTest;
 import org.apache.hadoop.fs.FSMainOperationsBaseTest;
+import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.FileSystemTestHelper;
 import org.apache.hadoop.fs.FileSystemTestHelper;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.permission.FsPermission;
 import org.apache.hadoop.fs.permission.FsPermission;
 import org.apache.hadoop.hdfs.DFSConfigKeys;
 import org.apache.hadoop.hdfs.DFSConfigKeys;
+import org.apache.hadoop.hdfs.DFSTestUtil;
 import org.apache.hadoop.hdfs.MiniDFSCluster;
 import org.apache.hadoop.hdfs.MiniDFSCluster;
 import org.apache.hadoop.hdfs.server.datanode.web.resources.DatanodeWebHdfsMethods;
 import org.apache.hadoop.hdfs.server.datanode.web.resources.DatanodeWebHdfsMethods;
 import org.apache.hadoop.hdfs.web.resources.ExceptionHandler;
 import org.apache.hadoop.hdfs.web.resources.ExceptionHandler;
@@ -60,6 +62,7 @@ public class TestFSMainOperationsWebHdfs extends FSMainOperationsBaseTest {
 
 
     final Configuration conf = new Configuration();
     final Configuration conf = new Configuration();
     conf.setBoolean(DFSConfigKeys.DFS_WEBHDFS_ENABLED_KEY, true);
     conf.setBoolean(DFSConfigKeys.DFS_WEBHDFS_ENABLED_KEY, true);
+    conf.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, 1024);
     try {
     try {
       cluster = new MiniDFSCluster.Builder(conf).numDataNodes(2).build();
       cluster = new MiniDFSCluster.Builder(conf).numDataNodes(2).build();
       cluster.waitActive();
       cluster.waitActive();
@@ -101,6 +104,30 @@ public class TestFSMainOperationsWebHdfs extends FSMainOperationsBaseTest {
     return defaultWorkingDirectory;
     return defaultWorkingDirectory;
   }
   }
 
 
+  @Test
+  public void testConcat() throws Exception {
+    Path[] paths = {new Path("/test/hadoop/file1"),
+                    new Path("/test/hadoop/file2"),
+                    new Path("/test/hadoop/file3")};
+
+    DFSTestUtil.createFile(fSys, paths[0], 1024, (short) 3, 0);
+    DFSTestUtil.createFile(fSys, paths[1], 1024, (short) 3, 0);
+    DFSTestUtil.createFile(fSys, paths[2], 1024, (short) 3, 0);
+
+    Path catPath = new Path("/test/hadoop/catFile");
+    DFSTestUtil.createFile(fSys, catPath, 1024, (short) 3, 0);
+    Assert.assertTrue(exists(fSys, catPath));
+
+    fSys.concat(catPath, paths);
+
+    Assert.assertFalse(exists(fSys, paths[0]));
+    Assert.assertFalse(exists(fSys, paths[1]));
+    Assert.assertFalse(exists(fSys, paths[2]));
+
+    FileStatus fileStatus = fSys.getFileStatus(catPath);
+    Assert.assertEquals(1024*4, fileStatus.getLen());
+  }
+
   @Override
   @Override
   @Test
   @Test
   public void testMkdirsFailsForSubdirectoryOfExistingFile() throws Exception {
   public void testMkdirsFailsForSubdirectoryOfExistingFile() throws Exception {