Browse Source

HADOOP-1851. Permit specification of map output compression type and codec, independent of the final output's compression parameters. Contributed by Arun.

git-svn-id: https://svn.apache.org/repos/asf/lucene/hadoop/trunk@581398 13f79535-47bb-0310-9956-ffa450edef68
Doug Cutting 18 năm trước cách đây
mục cha
commit
582ab150ff

+ 5 - 0
CHANGES.txt

@@ -70,6 +70,11 @@ Trunk (unreleased changes)
     namenodes and rebalancing processes to communicate with a primary 
     namenode.  (Hairong Kuang via dhruba)
 
+    HADOOP-1851.  Permit specification of map output compression type
+    and codec, independent of the final output's compression
+    parameters.  (Arun C Murthy via cutting)
+
+
   OPTIMIZATIONS
 
     HADOOP-1910.  Reduce the number of RPCs that DistributedFileSystem.create()

+ 26 - 3
conf/hadoop-default.xml

@@ -722,15 +722,22 @@ creations/deletions), or "all".</description>
 <property>
   <name>mapred.output.compress</name>
   <value>false</value>
-  <description>Should the outputs of the reduces be compressed?
+  <description>Should the job outputs be compressed?
+  </description>
+</property>
+
+<property>
+  <name>mapred.output.compression.type</name>
+  <value>RECORD</value>
+  <description>If the job outputs are to compressed as SequenceFiles, how should
+               they be compressed? Should be one of NONE, RECORD or BLOCK.
   </description>
 </property>
 
 <property>
   <name>mapred.output.compression.codec</name>
   <value>org.apache.hadoop.io.compress.DefaultCodec</value>
-  <description>If the reduce outputs are compressed, how should they be 
-               compressed?
+  <description>If the job outputs are compressed, how should they be compressed?
   </description>
 </property>
 
@@ -742,6 +749,22 @@ creations/deletions), or "all".</description>
   </description>
 </property>
 
+<property>
+  <name>mapred.map.output.compression.type</name>
+  <value>RECORD</value>
+  <description>If the map outputs are to compressed, how should they
+               be compressed? Should be one of NONE, RECORD or BLOCK.
+  </description>
+</property>
+
+<property>
+  <name>mapred.map.output.compression.codec</name>
+  <value>org.apache.hadoop.io.compress.DefaultCodec</value>
+  <description>If the map outputs are compressed, how should they be 
+               compressed?
+  </description>
+</property>
+
 <property>
   <name>io.seqfile.compress.blocksize</name>
   <value>1000000</value>

+ 22 - 1
src/java/org/apache/hadoop/io/MapFile.java

@@ -24,6 +24,8 @@ import org.apache.hadoop.conf.*;
 import org.apache.hadoop.util.Progressable;
 import org.apache.hadoop.util.ReflectionUtils;
 import org.apache.hadoop.io.SequenceFile.CompressionType;
+import org.apache.hadoop.io.compress.CompressionCodec;
+import org.apache.hadoop.io.compress.DefaultCodec;
 
 /** A file-based map from keys to values.
  * 
@@ -85,6 +87,16 @@ public class MapFile {
            compress, progress);
     }
 
+    /** Create the named map for keys of the named class. */
+    public Writer(Configuration conf, FileSystem fs, String dirName,
+                  Class keyClass, Class valClass,
+                  CompressionType compress, CompressionCodec codec,
+                  Progressable progress)
+      throws IOException {
+      this(conf, fs, dirName, WritableComparator.get(keyClass), valClass,
+           compress, codec, progress);
+    }
+
     /** Create the named map for keys of the named class. */
     public Writer(Configuration conf, FileSystem fs, String dirName,
                   Class keyClass, Class valClass, CompressionType compress)
@@ -112,6 +124,15 @@ public class MapFile {
                   SequenceFile.CompressionType compress,
                   Progressable progress)
       throws IOException {
+      this(conf, fs, dirName, comparator, valClass, 
+           compress, new DefaultCodec(), progress);
+    }
+    /** Create the named map using the named key comparator. */
+    public Writer(Configuration conf, FileSystem fs, String dirName,
+                  WritableComparator comparator, Class valClass,
+                  SequenceFile.CompressionType compress, CompressionCodec codec,
+                  Progressable progress)
+      throws IOException {
 
       this.comparator = comparator;
       this.lastKey = comparator.newKey();
@@ -126,7 +147,7 @@ public class MapFile {
       Class keyClass = comparator.getKeyClass();
       this.data =
         SequenceFile.createWriter
-        (fs, conf, dataFile, keyClass, valClass, compress, progress);
+        (fs, conf, dataFile, keyClass, valClass, compress, codec, progress);
       this.index =
         SequenceFile.createWriter
         (fs, conf, indexFile, keyClass, LongWritable.class,

+ 34 - 28
src/java/org/apache/hadoop/mapred/JobConf.java

@@ -34,6 +34,7 @@ import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.conf.Configuration;
 
 import org.apache.hadoop.io.*;
+import org.apache.hadoop.io.SequenceFile.CompressionType;
 import org.apache.hadoop.io.compress.CompressionCodec;
 
 import org.apache.hadoop.mapred.lib.IdentityMapper;
@@ -296,6 +297,7 @@ public class JobConf extends Configuration {
   /**
    * Should the map outputs be compressed before transfer?
    * Uses the SequenceFile compression.
+   * @param compress should the map outputs be compressed?
    */
   public void setCompressMapOutput(boolean compress) {
     setBoolean("mapred.compress.map.output", compress);
@@ -303,60 +305,64 @@ public class JobConf extends Configuration {
   
   /**
    * Are the outputs of the maps be compressed?
-   * @return are they compressed?
+   * @return <code>true</code> if the outputs of the maps are to be compressed,
+   *         <code>false</code> otherwise
    */
   public boolean getCompressMapOutput() {
     return getBoolean("mapred.compress.map.output", false);
   }
 
   /**
-   * Set the compression type for the map outputs.
-   * @param style NONE, RECORD, or BLOCK to control how the map outputs are 
-   *        compressed
+   * Set the {@link CompressionType} for the map outputs.
+   * @param style the {@link CompressionType} to control how the map outputs  
+   *              are compressed
    */
-  public void setMapOutputCompressionType(SequenceFile.CompressionType style) {
-    set("map.output.compression.type", style.toString());
+  public void setMapOutputCompressionType(CompressionType style) {
+    set("mapred.map.output.compression.type", style.toString());
   }
   
   /**
-   * Get the compression type for the map outputs.
-   * @return the compression type, defaulting to job output compression type
+   * Get the {@link CompressionType} for the map outputs.
+   * @return the {@link CompressionType} for map outputs, defaulting to 
+   *         {@link CompressionType#RECORD} 
    */
-  public SequenceFile.CompressionType getMapOutputCompressionType() {
-    String val = get("map.output.compression.type", "RECORD");
-    return SequenceFile.CompressionType.valueOf(val);
+  public CompressionType getMapOutputCompressionType() {
+    String val = get("mapred.map.output.compression.type", 
+                     CompressionType.RECORD.toString());
+    return CompressionType.valueOf(val);
   }
-  
+
   /**
-   * Set the given class as the  compression codec for the map outputs.
-   * @param codecClass the CompressionCodec class that will compress the 
-   *                   map outputs
+   * Set the given class as the  {@link CompressionCodec} for the map outputs.
+   * @param codecClass the {@link CompressionCodec} class that will compress  
+   *                   the map outputs
    */
-  public void setMapOutputCompressorClass(Class<? extends CompressionCodec> codecClass) {
-    setCompressMapOutput(true);
-    setClass("mapred.output.compression.codec", codecClass, 
+  public void 
+  setMapOutputCompressorClass(Class<? extends CompressionCodec> codecClass) {
+    setClass("mapred.map.output.compression.codec", codecClass, 
              CompressionCodec.class);
   }
   
   /**
-   * Get the codec for compressing the map outputs
-   * @param defaultValue the value to return if it is not set
-   * @return the CompressionCodec class that should be used to compress the 
-   *   map outputs
+   * Get the {@link CompressionCodec} for compressing the map outputs.
+   * @param defaultValue the {@link CompressionCodec} to return if not set
+   * @return the {@link CompressionCodec} class that should be used to compress the 
+   *         map outputs
    * @throws IllegalArgumentException if the class was specified, but not found
    */
-  public Class<? extends CompressionCodec> getMapOutputCompressorClass(Class<? extends CompressionCodec> defaultValue) {
-    String name = get("mapred.output.compression.codec");
-    if (name == null) {
-      return defaultValue;
-    } else {
+  public Class<? extends CompressionCodec> 
+  getMapOutputCompressorClass(Class<? extends CompressionCodec> defaultValue) {
+    Class<? extends CompressionCodec> codecClass = defaultValue;
+    String name = get("mapred.map.output.compression.codec");
+    if (name != null) {
       try {
-        return getClassByName(name).asSubclass(CompressionCodec.class);
+        codecClass = getClassByName(name).asSubclass(CompressionCodec.class);
       } catch (ClassNotFoundException e) {
         throw new IllegalArgumentException("Compression codec " + name + 
                                            " was not found.", e);
       }
     }
+    return codecClass;
   }
   
   /**

+ 18 - 2
src/java/org/apache/hadoop/mapred/MapFileOutputFormat.java

@@ -28,8 +28,12 @@ import org.apache.hadoop.io.SequenceFile;
 import org.apache.hadoop.io.MapFile;
 import org.apache.hadoop.io.WritableComparable;
 import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.SequenceFile.CompressionType;
+import org.apache.hadoop.io.compress.CompressionCodec;
+import org.apache.hadoop.io.compress.DefaultCodec;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.util.Progressable;
+import org.apache.hadoop.util.ReflectionUtils;
 
 /** An {@link OutputFormat} that writes {@link MapFile}s. */
 public class MapFileOutputFormat extends OutputFormatBase {
@@ -39,13 +43,25 @@ public class MapFileOutputFormat extends OutputFormatBase {
     throws IOException {
 
     Path file = new Path(job.getOutputPath(), name);
-
+    
+    CompressionCodec codec = null;
+    CompressionType compressionType = CompressionType.NONE;
+    if (getCompressOutput(job)) {
+      // find the kind of compression to do
+      compressionType = SequenceFileOutputFormat.getOutputCompressionType(job);
+
+      // find the right codec
+      Class codecClass = getOutputCompressorClass(job, DefaultCodec.class);
+      codec = (CompressionCodec) 
+        ReflectionUtils.newInstance(codecClass, job);
+    }
+    
     // ignore the progress parameter, since MapFile is local
     final MapFile.Writer out =
       new MapFile.Writer(job, file.getFileSystem(job), file.toString(),
                          job.getOutputKeyClass(),
                          job.getOutputValueClass(),
-                         SequenceFile.getCompressionType(job),
+                         compressionType, codec,
                          progress);
 
     return new RecordWriter() {

+ 30 - 22
src/java/org/apache/hadoop/mapred/OutputFormatBase.java

@@ -33,54 +33,62 @@ public abstract class OutputFormatBase<K extends WritableComparable,
   implements OutputFormat<K, V> {
 
   /**
-   * Set whether the output of the reduce is compressed
-   * @param val the new setting
+   * Set whether the output of the job is compressed.
+   * @param conf the {@link JobConf} to modify
+   * @param compress should the output of the job be compressed?
    */
-  public static void setCompressOutput(JobConf conf, boolean val) {
-    conf.setBoolean("mapred.output.compress", val);
+  public static void setCompressOutput(JobConf conf, boolean compress) {
+    conf.setBoolean("mapred.output.compress", compress);
   }
   
   /**
-   * Is the reduce output compressed?
-   * @return true, if the output should be compressed
+   * Is the job output compressed?
+   * @param conf the {@link JobConf} to look in
+   * @return <code>true</code> if the job output should be compressed,
+   *         <code>false</code> otherwise
    */
   public static boolean getCompressOutput(JobConf conf) {
     return conf.getBoolean("mapred.output.compress", false);
   }
   
   /**
-   * Set the given class as the output compression codec.
-   * @param conf the JobConf to modify
-   * @param codecClass the CompressionCodec class that will compress the 
-   *                   reduce outputs
+   * Set the {@link CompressionCodec} to be used to compress job outputs.
+   * @param conf the {@link JobConf} to modify
+   * @param codecClass the {@link CompressionCodec} to be used to
+   *                   compress the job outputs
    */
-  public static void setOutputCompressorClass(JobConf conf, Class codecClass) {
+  public static void 
+  setOutputCompressorClass(JobConf conf, 
+                           Class<? extends CompressionCodec> codecClass) {
     setCompressOutput(conf, true);
     conf.setClass("mapred.output.compression.codec", codecClass, 
                   CompressionCodec.class);
   }
   
   /**
-   * Get the codec for compressing the reduce outputs
-   * @param conf the Configuration to look in
-   * @param defaultValue the value to return if it is not set
-   * @return the CompressionCodec class that should be used to compress the 
-   *   reduce outputs
+   * Get the {@link CompressionCodec} for compressing the job outputs.
+   * @param conf the {@link JobConf} to look in
+   * @param defaultValue the {@link CompressionCodec} to return if not set
+   * @return the {@link CompressionCodec} to be used to compress the 
+   *         job outputs
    * @throws IllegalArgumentException if the class was specified, but not found
    */
-  public static Class getOutputCompressorClass(JobConf conf, 
-                                               Class defaultValue) {
+  public static Class<? extends CompressionCodec> 
+  getOutputCompressorClass(JobConf conf, 
+		                       Class<? extends CompressionCodec> defaultValue) {
+    Class<? extends CompressionCodec> codecClass = defaultValue;
+    
     String name = conf.get("mapred.output.compression.codec");
-    if (name == null) {
-      return defaultValue;
-    } else {
+    if (name != null) {
       try {
-        return conf.getClassByName(name);
+        codecClass = 
+        	conf.getClassByName(name).asSubclass(CompressionCodec.class);
       } catch (ClassNotFoundException e) {
         throw new IllegalArgumentException("Compression codec " + name + 
                                            " was not found.", e);
       }
     }
+    return codecClass;
   }
   
   public abstract RecordWriter<K, V> getRecordWriter(FileSystem ignored,

+ 25 - 1
src/java/org/apache/hadoop/mapred/SequenceFileOutputFormat.java

@@ -46,7 +46,7 @@ public class SequenceFileOutputFormat extends OutputFormatBase {
     CompressionType compressionType = CompressionType.NONE;
     if (getCompressOutput(job)) {
       // find the kind of compression to do
-      compressionType = SequenceFile.getCompressionType(job);
+      compressionType = getOutputCompressionType(job);
 
       // find the right codec
       Class codecClass = getOutputCompressorClass(job, DefaultCodec.class);
@@ -88,5 +88,29 @@ public class SequenceFileOutputFormat extends OutputFormatBase {
     }
     return parts;
   }
+
+  /**
+   * Get the {@link CompressionType} for the output {@link SequenceFile}.
+   * @param conf the {@link JobConf}
+   * @return the {@link CompressionType} for the output {@link SequenceFile}, 
+   *         defaulting to {@link CompressionType#RECORD}
+   */
+  public static CompressionType getOutputCompressionType(JobConf conf) {
+    String val = conf.get("mapred.output.compression.type", 
+                          CompressionType.RECORD.toString());
+    return CompressionType.valueOf(val);
+  }
+  
+  /**
+   * Set the {@link CompressionType} for the output {@link SequenceFile}.
+   * @param conf the {@link JobConf} to modify
+   * @param style the {@link CompressionType} for the output
+   *              {@link SequenceFile} 
+   */
+  public static void setOutputCompressionType(JobConf conf, 
+		                                          CompressionType style) {
+    conf.set("mapred.output.compression.type", style.toString());
+  }
+
 }
 

+ 1 - 2
src/test/org/apache/hadoop/mapred/TestMapRed.java

@@ -259,8 +259,7 @@ public class TestMapRed extends TestCase {
       
     public void configure(JobConf conf) {
       this.conf = conf;
-      compressInput = conf.getBoolean("mapred.compress.map.output", 
-                                      false);
+      compressInput = conf.getCompressMapOutput();
       taskId = conf.get("mapred.task.id");
     }