Forráskód Böngészése

HDFS-4905. Merging change r1510773 from trunk to branch-2.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-2@1510780 13f79535-47bb-0310-9956-ffa450edef68
Chris Nauroth 11 éve
szülő
commit
9541a31f1d

+ 93 - 5
hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/shell/CopyCommands.java

@@ -18,18 +18,16 @@
 
 package org.apache.hadoop.fs.shell;
 
-import java.io.File;
-import java.io.IOException;
+import java.io.*;
 import java.net.URI;
 import java.net.URISyntaxException;
+import java.util.Arrays;
 import java.util.LinkedList;
 import java.util.List;
 
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceStability;
-import org.apache.hadoop.fs.FSDataInputStream;
-import org.apache.hadoop.fs.FSDataOutputStream;
-import org.apache.hadoop.fs.PathIsDirectoryException;
+import org.apache.hadoop.fs.*;
 import org.apache.hadoop.io.IOUtils;
 
 /** Various commands for copy files */
@@ -44,6 +42,7 @@ class CopyCommands {
     factory.addClass(CopyToLocal.class, "-copyToLocal");
     factory.addClass(Get.class, "-get");
     factory.addClass(Put.class, "-put");
+    factory.addClass(AppendToFile.class, "-appendToFile");
   }
 
   /** merge multiple files together */
@@ -235,4 +234,93 @@ class CopyCommands {
     public static final String USAGE = Get.USAGE;
     public static final String DESCRIPTION = "Identical to the -get command.";
   }
+
+  /**
+   *  Append the contents of one or more local files to a remote
+   *  file.
+   */
+  public static class AppendToFile extends CommandWithDestination {
+    public static final String NAME = "appendToFile";
+    public static final String USAGE = "<localsrc> ... <dst>";
+    public static final String DESCRIPTION =
+        "Appends the contents of all the given local files to the\n" +
+            "given dst file. The dst file will be created if it does\n" +
+            "not exist. If <localSrc> is -, then the input is read\n" +
+            "from stdin.";
+
+    private static final int DEFAULT_IO_LENGTH = 1024 * 1024;
+    boolean readStdin = false;
+
+    // commands operating on local paths have no need for glob expansion
+    @Override
+    protected List<PathData> expandArgument(String arg) throws IOException {
+      List<PathData> items = new LinkedList<PathData>();
+      if (arg.equals("-")) {
+        readStdin = true;
+      } else {
+        try {
+          items.add(new PathData(new URI(arg), getConf()));
+        } catch (URISyntaxException e) {
+          if (Path.WINDOWS) {
+            // Unlike URI, PathData knows how to parse Windows drive-letter paths.
+            items.add(new PathData(arg, getConf()));
+          } else {
+            throw new IOException("Unexpected URISyntaxException: " + e.toString());
+          }
+        }
+      }
+      return items;
+    }
+
+    @Override
+    protected void processOptions(LinkedList<String> args)
+        throws IOException {
+
+      if (args.size() < 2) {
+        throw new IOException("missing destination argument");
+      }
+
+      getRemoteDestination(args);
+      super.processOptions(args);
+    }
+
+    @Override
+    protected void processArguments(LinkedList<PathData> args)
+        throws IOException {
+
+      if (!dst.exists) {
+        dst.fs.create(dst.path, false).close();
+      }
+
+      InputStream is = null;
+      FSDataOutputStream fos = dst.fs.append(dst.path);
+
+      try {
+        if (readStdin) {
+          if (args.size() == 0) {
+            IOUtils.copyBytes(System.in, fos, DEFAULT_IO_LENGTH);
+          } else {
+            throw new IOException(
+                "stdin (-) must be the sole input argument when present");
+          }
+        }
+
+        // Read in each input file and write to the target.
+        for (PathData source : args) {
+          is = new FileInputStream(source.toFile());
+          IOUtils.copyBytes(is, fos, DEFAULT_IO_LENGTH);
+          IOUtils.closeStream(is);
+          is = null;
+        }
+      } finally {
+        if (is != null) {
+          IOUtils.closeStream(is);
+        }
+
+        if (fos != null) {
+          IOUtils.closeStream(fos);
+        }
+      }
+    }
+  }
 }

+ 22 - 1
hadoop-common-project/hadoop-common/src/site/apt/FileSystemShell.apt.vm

@@ -45,6 +45,27 @@ bin/hadoop fs <args>
    Differences are described with each of the commands. Error information is
    sent to stderr and the output is sent to stdout.
 
+appendToFile
+
+      Usage: <<<hdfs dfs -appendToFile <localsrc> ... <dst> >>>
+
+      Append single src, or multiple srcs from local file system to the
+      destination file system. Also reads input from stdin and appends to
+      destination file system.
+
+        * <<<hdfs dfs -appendToFile localfile /user/hadoop/hadoopfile>>>
+
+        * <<<hdfs dfs -appendToFile localfile1 localfile2 /user/hadoop/hadoopfile>>>
+
+        * <<<hdfs dfs -appendToFile localfile hdfs://nn.example.com/hadoop/hadoopfile>>>
+
+        * <<<hdfs dfs -appendToFile - hdfs://nn.example.com/hadoop/hadoopfile>>>
+          Reads the input from stdin.
+
+      Exit Code:
+
+      Returns 0 on success and 1 on error.
+
 cat
 
    Usage: <<<hdfs dfs -cat URI [URI ...]>>>
@@ -76,7 +97,7 @@ chmod
 
    Change the permissions of files. With -R, make the change recursively
    through the directory structure. The user must be the owner of the file, or
-   else a super-user. Additional information is in the 
+   else a super-user. Additional information is in the
    {{{betterurl}Permissions Guide}}.
 
 chown

+ 3 - 0
hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt

@@ -58,6 +58,9 @@ Release 2.1.1-beta - UNRELEASED
     HDFS-5061. Make FSNameSystem#auditLoggers an unmodifiable list. 
     (Arpit Agarwal via suresh)
 
+    HDFS-4905. Add appendToFile command to "hdfs dfs". (Arpit Agarwal via
+    cnauroth)
+
   OPTIMIZATIONS
 
   BUG FIXES

+ 96 - 15
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSShell.java

@@ -17,17 +17,7 @@
  */
 package org.apache.hadoop.hdfs;
 
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertTrue;
-
-import java.io.ByteArrayOutputStream;
-import java.io.DataOutputStream;
-import java.io.File;
-import java.io.IOException;
-import java.io.OutputStream;
-import java.io.PrintStream;
-import java.io.PrintWriter;
+import java.io.*;
 import java.security.Permission;
 import java.security.PrivilegedExceptionAction;
 import java.util.ArrayList;
@@ -42,10 +32,7 @@ import java.util.zip.GZIPOutputStream;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FSInputChecker;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.FsShell;
-import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.*;
 import org.apache.hadoop.fs.permission.FsPermission;
 import org.apache.hadoop.hdfs.protocol.Block;
 import org.apache.hadoop.hdfs.server.datanode.DataNode;
@@ -63,6 +50,9 @@ import org.apache.hadoop.util.ToolRunner;
 import org.junit.Test;
 
 import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.FS_TRASH_INTERVAL_KEY;
+import static org.hamcrest.CoreMatchers.is;
+import static org.hamcrest.CoreMatchers.not;
+import static org.junit.Assert.*;
 
 /**
  * This class tests commands from DFSShell.
@@ -101,6 +91,18 @@ public class TestDFSShell {
     return f;
   }
 
+  static File createLocalFileWithRandomData(int fileLength, File f)
+      throws IOException {
+    assertTrue(!f.exists());
+    f.createNewFile();
+    FileOutputStream out = new FileOutputStream(f.toString());
+    byte[] buffer = new byte[fileLength];
+    out.write(buffer);
+    out.flush();
+    out.close();
+    return f;
+  }
+
   static void show(String s) {
     System.out.println(Thread.currentThread().getStackTrace()[2] + " " + s);
   }
@@ -1732,6 +1734,85 @@ public class TestDFSShell {
     }
   }
 
+
+  @Test (timeout = 300000)
+  public void testAppendToFile() throws Exception {
+    final int inputFileLength = 1024 * 1024;
+    File testRoot = new File(TEST_ROOT_DIR, "testAppendtoFileDir");
+    testRoot.mkdirs();
+
+    File file1 = new File(testRoot, "file1");
+    File file2 = new File(testRoot, "file2");
+    createLocalFileWithRandomData(inputFileLength, file1);
+    createLocalFileWithRandomData(inputFileLength, file2);
+
+    Configuration conf = new HdfsConfiguration();
+    MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).numDataNodes(1).build();
+    cluster.waitActive();
+
+    try {
+      FileSystem dfs = cluster.getFileSystem();
+      assertTrue("Not a HDFS: " + dfs.getUri(),
+                 dfs instanceof DistributedFileSystem);
+
+      // Run appendToFile once, make sure that the target file is
+      // created and is of the right size.
+      Path remoteFile = new Path("/remoteFile");
+      FsShell shell = new FsShell();
+      shell.setConf(conf);
+      String[] argv = new String[] {
+          "-appendToFile", file1.toString(), file2.toString(), remoteFile.toString() };
+      int res = ToolRunner.run(shell, argv);
+      assertThat(res, is(0));
+      assertThat(dfs.getFileStatus(remoteFile).getLen(), is((long) inputFileLength * 2));
+
+      // Run the command once again and make sure that the target file
+      // size has been doubled.
+      res = ToolRunner.run(shell, argv);
+      assertThat(res, is(0));
+      assertThat(dfs.getFileStatus(remoteFile).getLen(), is((long) inputFileLength * 4));
+    } finally {
+      cluster.shutdown();
+    }
+  }
+
+  @Test (timeout = 300000)
+  public void testAppendToFileBadArgs() throws Exception {
+    final int inputFileLength = 1024 * 1024;
+    File testRoot = new File(TEST_ROOT_DIR, "testAppendToFileBadArgsDir");
+    testRoot.mkdirs();
+
+    File file1 = new File(testRoot, "file1");
+    createLocalFileWithRandomData(inputFileLength, file1);
+
+    Configuration conf = new HdfsConfiguration();
+    MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).numDataNodes(1).build();
+    cluster.waitActive();
+
+    try {
+      FileSystem dfs = cluster.getFileSystem();
+      assertTrue("Not a HDFS: " + dfs.getUri(),
+                 dfs instanceof DistributedFileSystem);
+
+      // Run appendToFile with insufficient arguments.
+      FsShell shell = new FsShell();
+      shell.setConf(conf);
+      String[] argv = new String[] {
+          "-appendToFile", file1.toString() };
+      int res = ToolRunner.run(shell, argv);
+      assertThat(res, not(0));
+
+      // Mix stdin with other input files. Must fail.
+      Path remoteFile = new Path("/remoteFile");
+      argv = new String[] {
+          "-appendToFile", file1.toString(), "-", remoteFile.toString() };
+      res = ToolRunner.run(shell, argv);
+      assertThat(res, not(0));
+    } finally {
+      cluster.shutdown();
+    }
+  }
+
   /**
    * Test that the server trash configuration is respected when
    * the client configuration is not set.