소스 검색

HADOOP-18989. Use thread pool to improve the speed of creating control files in TestDFSIO (#6294). Contributed by farmmamba.

Signed-off-by: Shuyan Zhang <zhangshuyan@apache.org>
hfutatzhanghb 1 년 전
부모
커밋
e91aec930f

+ 81 - 9
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/fs/TestDFSIO.java

@@ -32,7 +32,16 @@ import java.text.DecimalFormat;
 import java.util.Collection;
 import java.util.Date;
 import java.util.StringTokenizer;
+import java.util.concurrent.CompletionService;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorCompletionService;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
 import java.util.concurrent.ThreadLocalRandom;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hdfs.DFSConfigKeys;
 import org.apache.hadoop.hdfs.DistributedFileSystem;
@@ -116,6 +125,10 @@ public class TestDFSIO implements Tool {
       "test.io.block.storage.policy";
   private static final String ERASURE_CODE_POLICY_NAME_KEY =
       "test.io.erasure.code.policy";
+  private ExecutorService excutorService = Executors.newFixedThreadPool(
+      2 * Runtime.getRuntime().availableProcessors());
+  private CompletionService<String> completionService =
+      new ExecutorCompletionService<>(excutorService);
 
   static{
     Configuration.addDefaultResource("hdfs-default.xml");
@@ -289,12 +302,43 @@ public class TestDFSIO implements Tool {
     bench.analyzeResult(fs, TestType.TEST_TYPE_TRUNCATE, execTime);
   }
 
+  private class ControlFileCreateTask implements Runnable {
+    private SequenceFile.Writer writer = null;
+    private String name;
+    private long nrBytes;
+
+    ControlFileCreateTask(SequenceFile.Writer writer, String name,
+        long nrBytes) {
+      this.writer = writer;
+      this.name = name;
+      this.nrBytes = nrBytes;
+    }
+
+    @Override
+    public void run() {
+      try {
+        writer.append(new Text(name), new LongWritable(nrBytes));
+      } catch (Exception e) {
+        LOG.error(e.getLocalizedMessage());
+      } finally {
+        if (writer != null) {
+          try {
+            writer.close();
+          } catch (IOException e) {
+            LOG.error(e.toString());
+          }
+        }
+        writer = null;
+      }
+    }
+  }
+
   @SuppressWarnings("deprecation")
   private void createControlFile(FileSystem fs,
                                   long nrBytes, // in bytes
                                   int nrFiles
                                 ) throws IOException {
-    LOG.info("creating control file: "+nrBytes+" bytes, "+nrFiles+" files");
+    LOG.info("creating control file: " + nrBytes + " bytes, " + nrFiles + " files");
     final int maxDirItems = config.getInt(
         DFSConfigKeys.DFS_NAMENODE_MAX_DIRECTORY_ITEMS_KEY,
         DFSConfigKeys.DFS_NAMENODE_MAX_DIRECTORY_ITEMS_DEFAULT);
@@ -308,7 +352,7 @@ public class TestDFSIO implements Tool {
 
     fs.delete(controlDir, true);
 
-    for(int i=0; i < nrFiles; i++) {
+    for (int i = 0; i < nrFiles; i++) {
       String name = getFileName(i);
       Path controlFile = new Path(controlDir, "in_file_" + name);
       SequenceFile.Writer writer = null;
@@ -316,19 +360,42 @@ public class TestDFSIO implements Tool {
         writer = SequenceFile.createWriter(fs, config, controlFile,
                                            Text.class, LongWritable.class,
                                            CompressionType.NONE);
-        writer.append(new Text(name), new LongWritable(nrBytes));
+        Runnable controlFileCreateTask = new ControlFileCreateTask(writer, name, nrBytes);
+        completionService.submit(controlFileCreateTask, "success");
       } catch(Exception e) {
         throw new IOException(e.getLocalizedMessage());
-      } finally {
-        if (writer != null) {
-          writer.close();
+      }
+    }
+
+    boolean isSuccess = false;
+    int count = 0;
+    for (int i = 0; i < nrFiles; i++) {
+      try {
+        // Since control file is quiet small, we use 3 minutes here.
+        Future<String> future = completionService.poll(3, TimeUnit.MINUTES);
+        if (future != null) {
+          future.get(3, TimeUnit.MINUTES);
+          count++;
+        } else {
+          break;
         }
-        writer = null;
+      } catch (ExecutionException | InterruptedException | TimeoutException e) {
+        throw new IOException(e);
       }
     }
-    LOG.info("created control files for: " + nrFiles + " files");
+
+    if (count == nrFiles) {
+      isSuccess = true;
+    }
+
+    if (isSuccess) {
+      LOG.info("created control files for: " + nrFiles + " files");
+    } else {
+      throw new IOException("Create control files timeout.");
+    }
   }
 
+
   private static String getFileName(int fIdx) {
     return BASE_FILE_NAME + Integer.toString(fIdx);
   }
@@ -865,7 +932,12 @@ public class TestDFSIO implements Tool {
       cleanup(fs);
       return 0;
     }
-    createControlFile(fs, nrBytes, nrFiles);
+    try {
+      createControlFile(fs, nrBytes, nrFiles);
+    } catch (IOException e) {
+      LOG.warn(e.toString());
+      throw new IOException(e);
+    }
     long tStart = System.currentTimeMillis();
     switch(testType) {
     case TEST_TYPE_WRITE: