浏览代码

MAPREDUCE-2786. Add compression option for TestDFSIO. Contributed by Plamen Jeliazkov.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-0.23@1390150 13f79535-47bb-0310-9956-ffa450edef68
Konstantin Shvachko 12 年之前
父节点
当前提交
d00a1a476f

+ 3 - 0
hadoop-mapreduce-project/CHANGES.txt

@@ -8,6 +8,9 @@ Release 0.23.4 - UNRELEASED
 
   IMPROVEMENTS
 
+    MAPREDUCE-2786. Add compression option for TestDFSIO.
+    (Plamen Jeliazkov via shv)
+
     MAPREDUCE-4645. Provide a random seed to Slive to make the sequence
     of file names deterministic. (Ravi Prakash via shv)
 

+ 19 - 0
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/fs/IOMapperBase.java

@@ -22,7 +22,9 @@ import java.net.InetAddress;
 import org.apache.hadoop.conf.Configured;
 import org.apache.hadoop.io.LongWritable;
 import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.compress.CompressionCodec;
 import org.apache.hadoop.mapred.*;
+import org.apache.hadoop.util.ReflectionUtils;
 
 /**
  * Base mapper class for IO operations.
@@ -41,6 +43,7 @@ public abstract class IOMapperBase<T> extends Configured
   protected int bufferSize;
   protected FileSystem fs;
   protected String hostName;
+  protected CompressionCodec compressionCodec;
 
   public IOMapperBase() { 
   }
@@ -59,6 +62,22 @@ public abstract class IOMapperBase<T> extends Configured
     } catch(Exception e) {
       hostName = "localhost";
     }
+    
+    //grab compression
+    String compression = getConf().get("test.io.compression.class", null);
+    Class<? extends CompressionCodec> codec;
+
+    //try to initialize codec
+    try {
+      codec = (compression == null) ? null : 
+     Class.forName(compression).asSubclass(CompressionCodec.class);
+    } catch(Exception e) {
+      throw new RuntimeException("Compression codec not found: ", e);
+    }
+
+    if(codec != null) {
+      compressionCodec = (CompressionCodec) ReflectionUtils.newInstance(codec, getConf());
+    }
   }
 
   public void close() throws IOException {

+ 17 - 1
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/fs/TestDFSIO.java

@@ -23,6 +23,7 @@ import java.io.DataInputStream;
 import java.io.File;
 import java.io.FileOutputStream;
 import java.io.IOException;
+import java.io.InputStream;
 import java.io.InputStreamReader;
 import java.io.OutputStream;
 import java.io.PrintStream;
@@ -295,6 +296,8 @@ public class TestDFSIO extends TestCase implements Tool {
       // create file
       OutputStream out;
       out = fs.create(new Path(getDataDir(getConf()), name), true, bufferSize);
+    
+      if(compressionCodec != null) out = compressionCodec.createOutputStream(out);
       
       try {
         // write to the file
@@ -358,6 +361,8 @@ public class TestDFSIO extends TestCase implements Tool {
       OutputStream out;
       out = fs.append(new Path(getDataDir(getConf()), name), bufferSize);
       
+      if(compressionCodec != null) out = compressionCodec.createOutputStream(out);
+      
       try {
         // write to the file
         long nrRemaining;
@@ -394,7 +399,10 @@ public class TestDFSIO extends TestCase implements Tool {
                        long totalSize // in bytes
                      ) throws IOException {
       // open file
-      DataInputStream in = fs.open(new Path(getDataDir(getConf()), name));
+      InputStream in = fs.open(new Path(getDataDir(getConf()), name));
+      
+      if(compressionCodec != null) in = compressionCodec.createInputStream(in);
+      
       long actualSize = 0;
       try {
         while (actualSize < totalSize) {
@@ -459,6 +467,7 @@ public class TestDFSIO extends TestCase implements Tool {
     long fileSize = 1*MEGA;
     int nrFiles = 1;
     String resFileName = DEFAULT_RES_FILE_NAME;
+    String compressionClass = null;
     boolean isSequential = false;
     String version = TestDFSIO.class.getSimpleName() + ".0.0.6";
 
@@ -479,6 +488,8 @@ public class TestDFSIO extends TestCase implements Tool {
         testType = TEST_TYPE_CLEANUP;
       } else if (args[i].startsWith("-seq")) {
         isSequential = true;
+      } else if (args[i].startsWith("-compression")) {
+        compressionClass = args[++i];
       } else if (args[i].equals("-nrFiles")) {
         nrFiles = Integer.parseInt(args[++i]);
       } else if (args[i].equals("-fileSize")) {
@@ -497,6 +508,11 @@ public class TestDFSIO extends TestCase implements Tool {
     LOG.info("fileSize (MB) = " + toMB(fileSize));
     LOG.info("bufferSize = " + bufferSize);
     LOG.info("baseDir = " + getBaseDir(config));
+    
+    if(compressionClass != null) {
+      config.set("test.io.compression.class", compressionClass);
+      LOG.info("compressionClass = " + compressionClass);
+    }
 
     config.setInt("test.io.file.buffer.size", bufferSize);
     config.setBoolean(DFSConfigKeys.DFS_SUPPORT_APPEND_KEY, true);