浏览代码

MAPREDUCE-7469. NNBench createControlFiles should use thread pool to improve performance. (#6463) Contributed by liuguanghua.

Signed-off-by: Shilun Fan <slfan1989@apache.org>
LiuGuH 1 年之前
父节点
当前提交
a60b5e2de3

+ 43 - 2
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/hdfs/NNBench.java

@@ -26,9 +26,15 @@ import java.io.IOException;
 import java.io.InputStreamReader;
 import java.io.PrintStream;
 import java.text.SimpleDateFormat;
+import java.util.ArrayList;
 import java.util.Date;
 import java.util.Iterator;
+import java.util.List;
 import java.util.StringTokenizer;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
 
 import org.apache.hadoop.HadoopIllegalArgumentException;
 import org.apache.hadoop.conf.Configuration;
@@ -55,6 +61,7 @@ import org.apache.hadoop.mapred.Reporter;
 import org.apache.hadoop.mapred.SequenceFileInputFormat;
 import org.apache.hadoop.util.Tool;
 import org.apache.hadoop.util.ToolRunner;
+import org.apache.hadoop.util.concurrent.HadoopExecutors;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -134,27 +141,61 @@ public class NNBench extends Configured implements Tool {
    * 
    * @throws IOException on error
    */
-  private void createControlFiles() throws IOException {
+  private void createControlFiles() {
     LOG.info("Creating " + numberOfMaps + " control files");
 
+    ExecutorService executorService =
+        HadoopExecutors.newFixedThreadPool(2 * Runtime.getRuntime().availableProcessors());
+    List<Future<Void>> list = new ArrayList<>();
     for (int i = 0; i < numberOfMaps; i++) {
       String strFileName = "NNBench_Controlfile_" + i;
       Path filePath = new Path(new Path(baseDir, CONTROL_DIR_NAME),
               strFileName);
 
+      Future<Void> future = executorService.submit(new CreateControlFile(strFileName, filePath, i));
+      list.add(future);
+    }
+
+    for (int i = 0; i < list.size(); i++) {
+      try {
+        list.get(i).get();
+      } catch (InterruptedException | ExecutionException e) {
+        LOG.error("Creating control files Error.", e);
+      }
+    }
+
+    executorService.shutdown();
+  }
+
+  private class CreateControlFile implements Callable<Void> {
+    private String strFileName;
+    private Path filePath;
+    private int order;
+
+    CreateControlFile(String strFileName, Path filePath, int order) {
+      this.strFileName = strFileName;
+      this.filePath = filePath;
+      this.order = order;
+    }
+
+    @Override
+    public Void call() throws Exception {
       SequenceFile.Writer writer = null;
       try {
         writer = SequenceFile.createWriter(getConf(), Writer.file(filePath),
             Writer.keyClass(Text.class), Writer.valueClass(LongWritable.class),
             Writer.compression(CompressionType.NONE));
-        writer.append(new Text(strFileName), new LongWritable(i));
+        writer.append(new Text(strFileName), new LongWritable(order));
       } finally {
         if (writer != null) {
           writer.close();
         }
       }
+      return null;
     }
+
   }
+
   /**
    * Display version
    */

+ 20 - 0
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/hdfs/TestNNBench.java

@@ -25,6 +25,7 @@ import java.io.File;
 import java.io.IOException;
 
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.mapred.HadoopTestCase;
 import org.apache.hadoop.mapred.JobConf;
@@ -37,6 +38,8 @@ public class TestNNBench extends HadoopTestCase {
   private static final String BASE_DIR =
       new File(System.getProperty("test.build.data", "build/test/data"),
           "NNBench").getAbsolutePath();
+  private static final String CONTROL_DIR_NAME = "control";
+
 
   public TestNNBench() throws IOException {
     super(LOCAL_MR, LOCAL_FS, 1, 1);
@@ -74,6 +77,15 @@ public class TestNNBench extends HadoopTestCase {
         getFileSystem().exists(renamedPath));
   }
 
+  @Test(timeout = 30000)
+  public void testNNBenchCreateControlFilesWithPool() throws Exception {
+    runNNBench(createJobConf(), "create_write", BASE_DIR, "5");
+    Path path = new Path(BASE_DIR, CONTROL_DIR_NAME);
+
+    FileStatus[] fileStatuses = getFileSystem().listStatus(path);
+    assertEquals(5, fileStatuses.length);
+  }
+
   @Test(timeout = 30000)
   public void testNNBenchCrossCluster() throws Exception {
     MiniDFSCluster dfsCluster = new MiniDFSCluster.Builder(new JobConf())
@@ -97,6 +109,14 @@ public class TestNNBench extends HadoopTestCase {
     assertEquals(0, ToolRunner.run(conf, new NNBench(), genArgs));
   }
 
+  private void runNNBench(Configuration conf, String operation, String baseDir, String numMaps)
+      throws Exception {
+    String[] genArgs = {"-operation", operation, "-baseDir", baseDir,
+        "-startTime", "" + (Time.now() / 1000 + 3), "-blockSize", "1024", "-maps", numMaps};
+
+    assertEquals(0, ToolRunner.run(conf, new NNBench(), genArgs));
+  }
+
   private void runNNBench(Configuration conf, String operation)
       throws Exception {
     runNNBench(conf, operation, BASE_DIR);