Bladeren bron

HADOOP-359. Permit map output to be compressed. Contributed by Owen.

git-svn-id: https://svn.apache.org/repos/asf/lucene/hadoop/trunk@421249 13f79535-47bb-0310-9956-ffa450edef68
Doug Cutting 19 jaren geleden
bovenliggende
commit
fba4e46943

+ 3 - 0
CHANGES.txt

@@ -24,6 +24,9 @@ Trunk (unreleased changes)
  6. HADOOP-327.  Fix ToolBase to not call System.exit() when
     exceptions are thrown.  (Hairong Kuang via cutting)
 
+ 7. HADOOP-359.  Permit map output to be compressed.
+   (omalley via cutting)
+
 
 Release 0.4.0 - 2006-06-28
 

+ 8 - 0
conf/hadoop-default.xml

@@ -363,6 +363,14 @@ creations/deletions), or "all".</description>
   </description>
 </property>
 
+<property>
+  <name>mapred.compress.map.output</name>
+  <value>false</value>
+  <description>Should the outputs of the maps be compressed before being
+               sent across the network. Uses SequenceFile compression.
+  </description>
+</property>
+
 <!-- ipc properties -->
 
 <property>

+ 23 - 1
src/java/org/apache/hadoop/mapred/JobConf.java

@@ -20,6 +20,8 @@ package org.apache.hadoop.mapred;
 import java.io.IOException;
 import java.io.File;
 
+import java.lang.reflect.Constructor;
+
 import java.util.StringTokenizer;
 import java.util.ArrayList;
 import java.util.Collections;
@@ -290,6 +292,22 @@ public class JobConf extends Configuration {
     setClass("mapred.input.value.class", theClass, Writable.class);
   }
   
+  /**
+   * Should the map outputs be compressed before transfer?
+   * Uses the SequenceFile compression.
+   */
+  public void setCompressMapOutput(boolean compress) {
+    setBoolean("mapred.compress.map.output", compress);
+  }
+  
+  /**
+   * Are the outputs of the maps be compressed?
+   * @return are they compressed?
+   */
+  public boolean getCompressMapOutput() {
+    return getBoolean("mapred.compress.map.output", false);
+  }
+  
   /**
    * Get the key class for the map output data. If it is not set, use the
    * (final) output ket class This allows the map output key class to be
@@ -448,10 +466,14 @@ public class JobConf extends Configuration {
     set("mapred.job.name", name);
   }
   
+  private static final Class[] emptyArray = new Class[]{};
+  
   public Object newInstance(Class theClass) {
     Object result;
     try {
-      result = theClass.newInstance();
+      Constructor meth = theClass.getDeclaredConstructor(emptyArray);
+      meth.setAccessible(true);
+      result = meth.newInstance(emptyArray);
     } catch (Exception e) {
       throw new RuntimeException(e);
     }

+ 15 - 9
src/java/org/apache/hadoop/mapred/LocalJobRunner.java

@@ -86,9 +86,11 @@ class LocalJobRunner implements JobSubmissionProtocol {
           mapIds.add("map_" + newId());
           MapTask map = new MapTask(jobId, file, (String)mapIds.get(i), i,
                                     splits[i]);
-          map.setConf(job);
+          JobConf localConf = new JobConf(job);
+          map.localizeConfiguration(localConf);
+          map.setConf(localConf);
           map_tasks += 1;
-          map.run(job, this);
+          map.run(localConf, this);
           map_tasks -= 1;
         }
 
@@ -105,12 +107,16 @@ class LocalJobRunner implements JobSubmissionProtocol {
         }
 
         // run a single reduce task
-        ReduceTask reduce = new ReduceTask(jobId, file, 
-                                           reduceId, 0, mapIds.size());
-        reduce.setConf(job);
-        reduce_tasks += 1;
-        reduce.run(job, this);
-        reduce_tasks -= 1;
+        {
+          ReduceTask reduce = new ReduceTask(jobId, file, 
+                                             reduceId, 0, mapIds.size());
+          JobConf localConf = new JobConf(job);
+          reduce.localizeConfiguration(localConf);
+          reduce.setConf(localConf);
+          reduce_tasks += 1;
+          reduce.run(localConf, this);
+          reduce_tasks -= 1;
+        }
         this.mapoutputFile.removeAll(reduceId);
         
         this.status.runState = JobStatus.SUCCEEDED;
@@ -211,5 +217,5 @@ class LocalJobRunner implements JobSubmissionProtocol {
     return new ClusterStatus(1, map_tasks, reduce_tasks, 1);
   }
 
-  public JobStatus[] jobsToComplete() {return null;};
+  public JobStatus[] jobsToComplete() {return null;}
 }

+ 6 - 2
src/java/org/apache/hadoop/mapred/MapTask.java

@@ -81,12 +81,16 @@ class MapTask extends Task {
     final int partitions = job.getNumReduceTasks();
     final SequenceFile.Writer[] outs = new SequenceFile.Writer[partitions];
     try {
+      FileSystem localFs = FileSystem.getNamed("local", job);
+      boolean compressTemps = job.getBoolean("mapred.compress.map.output", 
+                                             false);
       for (int i = 0; i < partitions; i++) {
         outs[i] =
-          new SequenceFile.Writer(FileSystem.getNamed("local", job),
+          new SequenceFile.Writer(localFs,
                                   this.mapOutputFile.getOutputFile(getTaskId(), i),
                                   job.getMapOutputKeyClass(),
-                                  job.getMapOutputValueClass());
+                                  job.getMapOutputValueClass(),
+                                  compressTemps);
       }
 
       final Partitioner partitioner =

+ 1 - 1
src/java/org/apache/hadoop/mapred/ReduceTask.java

@@ -258,7 +258,7 @@ class ReduceTask extends Task {
     NUMBER_FORMAT.setGroupingUsed(false);
   }
 
-  private static synchronized String getOutputName(int partition) {
+  static synchronized String getOutputName(int partition) {
     return "part-" + NUMBER_FORMAT.format(partition);
   }
 

+ 132 - 1
src/test/org/apache/hadoop/mapred/TestMapRed.java

@@ -18,6 +18,7 @@ package org.apache.hadoop.mapred;
 import org.apache.hadoop.fs.*;
 import org.apache.hadoop.io.*;
 import org.apache.hadoop.conf.*;
+import org.apache.hadoop.mapred.lib.*;
 import junit.framework.TestCase;
 import java.io.*;
 import java.util.*;
@@ -213,9 +214,139 @@ public class TestMapRed extends TestCase {
     **/
 
     public void testMapred() throws Exception {
-	launch();
+      launch();
     }
 
+    private static class MyMap implements Mapper {
+      private JobConf conf;
+      private boolean compress;
+      private String taskId;
+      
+      public void configure(JobConf conf) {
+        this.conf = conf;
+        compress = conf.getBoolean("mapred.compress.map.output", false);
+        taskId = conf.get("mapred.task.id");
+      }
+      
+      public void map(WritableComparable key, Writable value,
+                      OutputCollector output, Reporter reporter
+                      ) throws IOException {
+        String str = ((UTF8) value).toString().toLowerCase();
+        output.collect(new UTF8(str), value);
+      }
+
+      public void close() throws IOException {
+        MapOutputFile namer = new MapOutputFile();
+        namer.setConf(conf);
+        FileSystem fs = FileSystem.get(conf);
+        Path output = namer.getOutputFile(taskId, 0);
+        assertTrue("map output exists " + output, fs.exists(output));
+        SequenceFile.Reader rdr = 
+          new SequenceFile.Reader(fs, output, conf);
+        assertEquals("is map output compressed " + output, compress, 
+                     rdr.isCompressed());
+        rdr.close();
+      }
+    }
+    
+    private static class MyReduce extends IdentityReducer {
+      private JobConf conf;
+      private boolean compressInput;
+      private boolean compressOutput;
+      private String taskId;
+      private int partition;
+      private boolean first = true;
+      
+      public void configure(JobConf conf) {
+        this.conf = conf;
+        compressInput = conf.getBoolean("mapred.compress.map.output", 
+                                        false);
+        compressOutput = conf.getBoolean("mapred.compress.output",
+                                         false);
+        taskId = conf.get("mapred.task.id");
+        partition = conf.getInt("mapred.task.partition", -1);
+      }
+      
+      public void reduce(WritableComparable key, Iterator values,
+                         OutputCollector output, Reporter reporter
+                        ) throws IOException {
+        if (first) {
+          first = false;
+          Path input = conf.getLocalPath(taskId+"/all.2");
+          FileSystem fs = FileSystem.get(conf);
+          assertTrue("reduce input exists " + input, fs.exists(input));
+          SequenceFile.Reader rdr = 
+            new SequenceFile.Reader(fs, input, conf);
+          assertEquals("is reduce input compressed " + input, 
+                       compressInput, 
+                       rdr.isCompressed());
+          rdr.close();          
+        }
+      }
+      
+    }
+    
+    private void checkCompression(boolean compressMapOutput,
+                                  boolean compressReduceOutput,
+                                  boolean includeCombine
+                                  ) throws Exception {
+      JobConf conf = new JobConf();
+      Path testdir = new Path("build/test/test.mapred.compress");
+      Path inDir = new Path(testdir, "in");
+      Path outDir = new Path(testdir, "out");
+      FileSystem fs = FileSystem.get(conf);
+      conf.setInputPath(inDir);
+      conf.setOutputPath(outDir);
+      conf.setMapperClass(MyMap.class);
+      conf.setReducerClass(MyReduce.class);
+      conf.setOutputKeyClass(UTF8.class);
+      conf.setOutputValueClass(UTF8.class);
+      conf.setOutputFormat(SequenceFileOutputFormat.class);
+      if (includeCombine) {
+        conf.setCombinerClass(IdentityReducer.class);
+      }
+      if (compressMapOutput) {
+        conf.setBoolean("mapred.compress.map.output", true);
+      }
+      if (compressReduceOutput) {
+        conf.setBoolean("mapred.output.compress", true);
+      }
+      try {
+        fs.mkdirs(testdir);
+        fs.mkdirs(inDir);
+        Path inFile = new Path(inDir, "part0");
+        DataOutputStream f = fs.create(inFile);
+        f.writeBytes("Owen was here\n");
+        f.writeBytes("Hadoop is fun\n");
+        f.writeBytes("Is this done, yet?\n");
+        f.close();
+        JobClient.runJob(conf);
+        Path output = new Path(outDir,
+                               ReduceTask.getOutputName(0));
+        assertTrue("reduce output exists " + output, fs.exists(output));
+        SequenceFile.Reader rdr = 
+            new SequenceFile.Reader(fs, output, conf);
+        assertEquals("is reduce output compressed " + output, 
+                     compressReduceOutput, 
+                     rdr.isCompressed());
+        rdr.close();
+      } finally {
+        fs.delete(testdir);
+      }
+    }
+    
+    public void testCompression() throws Exception {
+      for(int compressMap=0; compressMap < 2; ++compressMap) {
+        for(int compressOut=0; compressOut < 2; ++compressOut) {
+          for(int combine=0; combine < 2; ++combine) {
+            checkCompression(compressMap == 1, compressOut == 1,
+                             combine == 1);
+          }
+        }
+      }
+    }
+    
+    
     /**
      * 
      */