Przeglądaj źródła

HADOOP-18023. Allow cp command to run with multi threads. (#3721)

(cherry picked from commit 932a78fe38b34a923f6852a1a19482075806ecba)
smarthan 3 lat temu
rodzic
commit
2f3e186978

+ 23 - 16
hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/shell/CopyCommands.java

@@ -146,33 +146,40 @@ class CopyCommands {
     }
   }
 
-  static class Cp extends CommandWithDestination {
+  static class Cp extends CopyCommandWithMultiThread {
     public static final String NAME = "cp";
     public static final String USAGE =
-        "[-f] [-p | -p[topax]] [-d] <src> ... <dst>";
+        "[-f] [-p | -p[topax]] [-d] [-t <thread count>]"
+            + " [-q <thread pool queue size>] <src> ... <dst>";
     public static final String DESCRIPTION =
-      "Copy files that match the file pattern <src> to a " +
-      "destination.  When copying multiple files, the destination " +
-      "must be a directory. Passing -p preserves status " +
-      "[topax] (timestamps, ownership, permission, ACLs, XAttr). " +
-      "If -p is specified with no <arg>, then preserves " +
-      "timestamps, ownership, permission. If -pa is specified, " +
-      "then preserves permission also because ACL is a super-set of " +
-      "permission. Passing -f overwrites the destination if it " +
-      "already exists. raw namespace extended attributes are preserved " +
-      "if (1) they are supported (HDFS only) and, (2) all of the source and " +
-      "target pathnames are in the /.reserved/raw hierarchy. raw namespace " +
-      "xattr preservation is determined solely by the presence (or absence) " +
-        "of the /.reserved/raw prefix and not by the -p option. Passing -d "+
-        "will skip creation of temporary file(<dst>._COPYING_).\n";
+        "Copy files that match the file pattern <src> to a destination."
+            + " When copying multiple files, the destination must be a "
+            + "directory.\nFlags :\n"
+            + "  -p[topax] : Preserve file attributes [topx] (timestamps, "
+            + "ownership, permission, ACL, XAttr). If -p is specified with "
+            + "no arg, then preserves timestamps, ownership, permission. "
+            + "If -pa is specified, then preserves permission also because "
+            + "ACL is a super-set of permission. Determination of whether raw "
+            + "namespace extended attributes are preserved is independent of "
+            + "the -p flag.\n"
+            + "  -f : Overwrite the destination if it already exists.\n"
+            + "  -d : Skip creation of temporary file(<dst>._COPYING_).\n"
+            + "  -t <thread count> : Number of threads to be used, "
+            + "default is 1.\n"
+            + "  -q <thread pool queue size> : Thread pool queue size to be "
+            + "used, default is 1024.\n";
 
     @Override
     protected void processOptions(LinkedList<String> args) throws IOException {
       popPreserveOption(args);
       CommandFormat cf = new CommandFormat(2, Integer.MAX_VALUE, "f", "d");
+      cf.addOptionWithValue("t");
+      cf.addOptionWithValue("q");
       cf.parse(args);
       setDirectWrite(cf.getOpt("d"));
       setOverwrite(cf.getOpt("f"));
+      setThreadCount(cf.getOptValue("t"));
+      setThreadPoolQueueSize(cf.getOptValue("q"));
       // should have a -r option
       setRecursive(true);
       getRemoteDestination(args);

+ 8 - 3
hadoop-common-project/hadoop-common/src/site/markdown/FileSystemShell.md

@@ -170,7 +170,7 @@ Returns 0 on success and -1 on error.
 cp
 ----
 
-Usage: `hadoop fs -cp [-f] [-p | -p[topax]] URI [URI ...] <dest> `
+Usage: `hadoop fs -cp [-f] [-p | -p[topax]] [-t <thread count>] [-q <thread pool queue size>] URI [URI ...] <dest>`
 
 Copy files from source to destination. This command allows multiple sources as well in which case the destination must be a directory.
 
@@ -178,13 +178,18 @@ Copy files from source to destination. This command allows multiple sources as w
 
 Options:
 
-* The -f option will overwrite the destination if it already exists.
-* The -p option will preserve file attributes [topx] (timestamps, ownership, permission, ACL, XAttr). If -p is specified with no *arg*, then preserves timestamps, ownership, permission. If -pa is specified, then preserves permission also because ACL is a super-set of permission. Determination of whether raw namespace extended attributes are preserved is independent of the -p flag.
+* `-f` : Overwrite the destination if it already exists.
+* `-d` : Skip creation of temporary file with the suffix `._COPYING_`.
+* `-p` : Preserve file attributes [topx] (timestamps, ownership, permission, ACL, XAttr). If -p is specified with no *arg*, then preserves timestamps, ownership, permission. If -pa is specified, then preserves permission also because ACL is a super-set of permission. Determination of whether raw namespace extended attributes are preserved is independent of the -p flag.
+* `-t <thread count>` : Number of threads to be used, default is 1. Useful when copying directories containing more than 1 file.
+* `-q <thread pool queue size>` : Thread pool queue size to be used, default is 1024. It takes effect only when thread count greater than 1.
 
 Example:
 
 * `hadoop fs -cp /user/hadoop/file1 /user/hadoop/file2`
 * `hadoop fs -cp /user/hadoop/file1 /user/hadoop/file2 /user/hadoop/dir`
+* `hadoop fs -cp -t 5 /user/hadoop/file1 /user/hadoop/file2 /user/hadoop/dir`
+* `hadoop fs -cp -t 10 -q 2048 /user/hadoop/file1 /user/hadoop/file2 /user/hadoop/dir`
 
 Exit Code:
 

+ 210 - 0
hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/shell/TestCpCommand.java

@@ -0,0 +1,210 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.fs.shell;
+
+import java.io.IOException;
+import java.util.LinkedList;
+import java.util.concurrent.ThreadPoolExecutor;
+
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import org.apache.commons.lang3.RandomStringUtils;
+import org.apache.commons.lang3.RandomUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.FileSystemTestHelper;
+import org.apache.hadoop.fs.LocalFileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.shell.CopyCommands.Cp;
+
+import static org.apache.hadoop.fs.shell.CopyCommandWithMultiThread.DEFAULT_QUEUE_SIZE;
+import static org.junit.Assert.assertEquals;
+
+public class TestCpCommand {
+
+  private static final String FROM_DIR_NAME = "fromDir";
+  private static final String TO_DIR_NAME = "toDir";
+
+  private static FileSystem fs;
+  private static Path testDir;
+  private static Configuration conf;
+
+  private Path dir = null;
+  private int numFiles = 0;
+
+  private static int initialize(Path dir) throws Exception {
+    fs.mkdirs(dir);
+    Path fromDirPath = new Path(dir, FROM_DIR_NAME);
+    fs.mkdirs(fromDirPath);
+    Path toDirPath = new Path(dir, TO_DIR_NAME);
+    fs.mkdirs(toDirPath);
+
+    int numTotalFiles = 0;
+    int numDirs = RandomUtils.nextInt(0, 5);
+    for (int dirCount = 0; dirCount < numDirs; ++dirCount) {
+      Path subDirPath = new Path(fromDirPath, "subdir" + dirCount);
+      fs.mkdirs(subDirPath);
+      int numFiles = RandomUtils.nextInt(0, 10);
+      for (int fileCount = 0; fileCount < numFiles; ++fileCount) {
+        numTotalFiles++;
+        Path subFile = new Path(subDirPath, "file" + fileCount);
+        fs.createNewFile(subFile);
+        FSDataOutputStream output = fs.create(subFile, true);
+        for (int i = 0; i < 100; ++i) {
+          output.writeInt(i);
+          output.writeChar('\n');
+        }
+        output.close();
+      }
+    }
+
+    return numTotalFiles;
+  }
+
+  @BeforeClass
+  public static void init() throws Exception {
+    conf = new Configuration(false);
+    conf.set("fs.file.impl", LocalFileSystem.class.getName());
+    fs = FileSystem.getLocal(conf);
+    testDir = new FileSystemTestHelper().getTestRootPath(fs);
+    // don't want scheme on the path, just an absolute path
+    testDir = new Path(fs.makeQualified(testDir).toUri().getPath());
+
+    FileSystem.setDefaultUri(conf, fs.getUri());
+    fs.setWorkingDirectory(testDir);
+  }
+
+  @AfterClass
+  public static void cleanup() throws Exception {
+    fs.delete(testDir, true);
+    fs.close();
+  }
+
+  private void run(CopyCommandWithMultiThread cmd, String... args) {
+    cmd.setConf(conf);
+    assertEquals(0, cmd.run(args));
+  }
+
+  @Before
+  public void initDirectory() throws Exception {
+    dir = new Path("dir" + RandomStringUtils.randomNumeric(4));
+    numFiles = initialize(dir);
+  }
+
+  @Test(timeout = 10000)
+  public void testCp() throws Exception {
+    MultiThreadedCp copy = new MultiThreadedCp(1, DEFAULT_QUEUE_SIZE, 0);
+    run(copy, new Path(dir, FROM_DIR_NAME).toString(),
+        new Path(dir, TO_DIR_NAME).toString());
+    assert copy.getExecutor() == null;
+  }
+
+  @Test(timeout = 10000)
+  public void testCpWithThreads() {
+    run(new MultiThreadedCp(5, DEFAULT_QUEUE_SIZE, numFiles), "-t", "5",
+        new Path(dir, FROM_DIR_NAME).toString(),
+        new Path(dir, TO_DIR_NAME).toString());
+  }
+
+  @Test(timeout = 10000)
+  public void testCpWithThreadWrong() {
+    run(new MultiThreadedCp(1, DEFAULT_QUEUE_SIZE, 0), "-t", "0",
+        new Path(dir, FROM_DIR_NAME).toString(),
+        new Path(dir, TO_DIR_NAME).toString());
+  }
+
+  @Test(timeout = 10000)
+  public void testCpWithThreadsAndQueueSize() {
+    int queueSize = 256;
+    run(new MultiThreadedCp(5, queueSize, numFiles), "-t", "5", "-q",
+        Integer.toString(queueSize),
+        new Path(dir, FROM_DIR_NAME).toString(),
+        new Path(dir, TO_DIR_NAME).toString());
+  }
+
+  @Test(timeout = 10000)
+  public void testCpWithThreadsAndQueueSizeWrong() {
+    int queueSize = 0;
+    run(new MultiThreadedCp(5, DEFAULT_QUEUE_SIZE, numFiles), "-t", "5", "-q",
+        Integer.toString(queueSize),
+        new Path(dir, FROM_DIR_NAME).toString(),
+        new Path(dir, TO_DIR_NAME).toString());
+  }
+
+  @Test(timeout = 10000)
+  public void testCpSingleFile() throws Exception {
+    Path fromDirPath = new Path(dir, FROM_DIR_NAME);
+    Path subFile = new Path(fromDirPath, "file0");
+    fs.createNewFile(subFile);
+    FSDataOutputStream output = fs.create(subFile, true);
+    for (int i = 0; i < 100; ++i) {
+      output.writeInt(i);
+      output.writeChar('\n');
+    }
+    output.close();
+
+    MultiThreadedCp copy = new MultiThreadedCp(5, DEFAULT_QUEUE_SIZE, 0);
+    run(copy, "-t", "5", subFile.toString(),
+        new Path(dir, TO_DIR_NAME).toString());
+    assert copy.getExecutor() == null;
+  }
+
+  private static class MultiThreadedCp extends Cp {
+    public static final String NAME = "multiThreadCp";
+    private final int expectedThreads;
+    private final int expectedQueuePoolSize;
+    private final int expectedCompletedTaskCount;
+
+    MultiThreadedCp(int expectedThreads, int expectedQueuePoolSize,
+        int expectedCompletedTaskCount) {
+      this.expectedThreads = expectedThreads;
+      this.expectedQueuePoolSize = expectedQueuePoolSize;
+      this.expectedCompletedTaskCount = expectedCompletedTaskCount;
+    }
+
+    @Override
+    protected void processArguments(LinkedList<PathData> args)
+        throws IOException {
+      // Check if the number of threads are same as expected
+      Assert.assertEquals(expectedThreads, getThreadCount());
+      // Check if the queue pool size of executor is same as expected
+      Assert.assertEquals(expectedQueuePoolSize, getThreadPoolQueueSize());
+
+      super.processArguments(args);
+
+      if (isMultiThreadNecessary(args)) {
+        // Once the copy is complete, check following
+        // 1) number of completed tasks are same as expected
+        // 2) There are no active tasks in the executor
+        // 3) Executor has shutdown correctly
+        ThreadPoolExecutor executor = getExecutor();
+        Assert.assertEquals(expectedCompletedTaskCount,
+            executor.getCompletedTaskCount());
+        Assert.assertEquals(0, executor.getActiveCount());
+        Assert.assertTrue(executor.isTerminated());
+      } else {
+        assert getExecutor() == null;
+      }
+    }
+  }
+}

+ 34 - 13
hadoop-common-project/hadoop-common/src/test/resources/testConf.xml

@@ -324,51 +324,72 @@
       <comparators>
         <comparator>
           <type>RegexpComparator</type>
-          <expected-output>^-cp \[-f\] \[-p \| -p\[topax\]\] \[-d\] &lt;src&gt; \.\.\. &lt;dst&gt; :\s*</expected-output>
+          <expected-output>^-cp \[-f\] \[-p \| -p\[topax\]\] \[-d\] \[-t &lt;thread count&gt;\] \[-q &lt;thread pool queue size&gt;\] &lt;src&gt; \.\.\. &lt;dst&gt; :\s*</expected-output>
         </comparator>
         <comparator>
           <type>RegexpComparator</type>
-          <expected-output>^\s*Copy files that match the file pattern &lt;src&gt; to a destination.  When copying( )*</expected-output>
+          <expected-output>^\s*Copy files that match the file pattern &lt;src&gt; to a destination. When copying( )*</expected-output>
         </comparator>
         <comparator>
           <type>RegexpComparator</type>
-          <expected-output>^( |\t)*multiple files, the destination must be a directory.( )*Passing -p preserves status( )*</expected-output>
+          <expected-output>^( |\t)*multiple files, the destination must be a directory.( )*</expected-output>
         </comparator>
         <comparator>
           <type>RegexpComparator</type>
-          <expected-output>^( |\t)*\[topax\] \(timestamps, ownership, permission, ACLs, XAttr\). If -p is specified( )*</expected-output>
+          <expected-output>^( |\t)*Flags :( )*</expected-output>
         </comparator>
         <comparator>
           <type>RegexpComparator</type>
-          <expected-output>^( |\t)*with no &lt;arg&gt;, then preserves timestamps, ownership, permission. If -pa is( )*</expected-output>
+          <expected-output>^( |\t)*-p\[topax\]\s+Preserve file attributes \[topx\] \(timestamps,( )*</expected-output>
         </comparator>
         <comparator>
           <type>RegexpComparator</type>
-          <expected-output>^( |\t)*specified, then preserves permission also because ACL is a super-set of( )*</expected-output>
+          <expected-output>^( |\t)*ownership, permission, ACL, XAttr\). If -p is( )*</expected-output>
         </comparator>
         <comparator>
           <type>RegexpComparator</type>
-          <expected-output>^( |\t)*permission. Passing -f overwrites the destination if it already exists. raw( )*</expected-output>
+          <expected-output>^( |\t)*specified with no arg, then preserves timestamps,( )*</expected-output>
         </comparator>
         <comparator>
           <type>RegexpComparator</type>
-          <expected-output>^( |\t)*namespace extended attributes are preserved if \(1\) they are supported \(HDFS( )*</expected-output>
+          <expected-output>^( |\t)*ownership, permission. If -pa is specified, then( )*</expected-output>
         </comparator>
         <comparator>
             <type>RegexpComparator</type>
-            <expected-output>^( |\t)*only\) and, \(2\) all of the source and target pathnames are in the \/\.reserved\/raw( )*</expected-output>
+            <expected-output>^( |\t)*preserves permission also because ACL is a( )*</expected-output>
         </comparator>
         <comparator>
           <type>RegexpComparator</type>
-          <expected-output>^( |\t)*hierarchy. raw namespace xattr preservation is determined solely by the presence( )*</expected-output>
+          <expected-output>^( |\t)*super-set of permission. Determination of whether( )*</expected-output>
         </comparator>
+
         <comparator>
-            <type>RegexpComparator</type>
-          <expected-output>^\s*\(or absence\) of the \/\.reserved\/raw prefix and not by the -p option\. Passing -d( )*</expected-output>
+          <type>RegexpComparator</type>
+          <expected-output>^( |\t)*raw namespace extended attributes are preserved is( )*</expected-output>
+        </comparator>
+        <comparator>
+          <type>RegexpComparator</type>
+          <expected-output>^( |\t)*independent of the -p flag.( )*</expected-output>
         </comparator>
         <comparator>
             <type>RegexpComparator</type>
-            <expected-output>^\s*will skip creation of temporary file\(&lt;dst&gt;\._COPYING_\)\.( )*</expected-output>
+          <expected-output>^\s*-f\s+Overwrite the destination if it already exists.( )*</expected-output>
+        </comparator>
+        <comparator>
+          <type>RegexpComparator</type>
+          <expected-output>^\s*-d\s+Skip creation of temporary file\(&lt;dst&gt;\._COPYING_\).( )*</expected-output>
+        </comparator>
+        <comparator>
+          <type>RegexpComparator</type>
+          <expected-output>^\s*-t &lt;thread count&gt;\s+Number of threads to be used, default is 1.( )*</expected-output>
+        </comparator>
+        <comparator>
+          <type>RegexpComparator</type>
+          <expected-output>^\s*-q &lt;thread pool queue size&gt;\s+Thread pool queue size to be used, default is( )*</expected-output>
+        </comparator>
+        <comparator>
+          <type>RegexpComparator</type>
+          <expected-output>^( |\t)*1024.\s*</expected-output>
         </comparator>
       </comparators>
     </test>