1
0
Kaynağa Gözat

HADOOP-2943. Compression of intermediate map output causes failures in the
merge.



git-svn-id: https://svn.apache.org/repos/asf/hadoop/core/trunk@635156 13f79535-47bb-0310-9956-ffa450edef68

Christopher Douglas 17 yıl önce
ebeveyn
işleme
0c2c5de914

+ 4 - 1
CHANGES.txt

@@ -150,7 +150,10 @@ Trunk (unreleased changes)
 
     HADOOP-2938. Some fs commands did not glob paths.
     (Tsz Wo (Nicholas), SZE via rangadi)
-    
+
+    HADOOP-2943. Compression of intermediate map output causes failures
+    in the merge. (cdouglas)
+
 Release 0.16.1 - 2008-03-13
 
   INCOMPATIBLE CHANGES

+ 16 - 10
src/java/org/apache/hadoop/io/SequenceFile.java

@@ -795,9 +795,9 @@ public class SequenceFile {
     Metadata metadata = null;
     Compressor compressor = null;
     
-    private Serializer keySerializer;
-    private Serializer uncompressedValSerializer;
-    private Serializer compressedValSerializer;
+    protected Serializer keySerializer;
+    protected Serializer uncompressedValSerializer;
+    protected Serializer compressedValSerializer;
     
     // Insert a globally unique 16-byte value every few entries, so that one
     // can seek into the middle of a file and then synchronize with record
@@ -1111,7 +1111,8 @@ public class SequenceFile {
     boolean isBlockCompressed() { return false; }
 
     /** Append a key/value pair. */
-    public synchronized void append(Writable key, Writable val)
+    @SuppressWarnings("unchecked")
+    public synchronized void append(Object key, Object val)
       throws IOException {
       if (key.getClass() != keyClass)
         throw new IOException("wrong key class: "+key.getClass().getName()
@@ -1123,14 +1124,14 @@ public class SequenceFile {
       buffer.reset();
 
       // Append the 'key'
-      key.write(buffer);
+      keySerializer.serialize(key);
       int keyLength = buffer.getLength();
       if (keyLength == 0)
         throw new IOException("zero length keys not allowed: " + key);
 
       // Compress 'value' and append it
       deflateFilter.resetState();
-      val.write(deflateOut);
+      compressedValSerializer.serialize(val);
       deflateOut.flush();
       deflateFilter.finish();
 
@@ -1238,8 +1239,12 @@ public class SequenceFile {
     boolean isBlockCompressed() { return true; }
 
     /** Initialize */
-    void init(int compressionBlockSize) {
+    void init(int compressionBlockSize) throws IOException {
       this.compressionBlockSize = compressionBlockSize;
+      keySerializer.close();
+      keySerializer.open(keyBuffer);
+      uncompressedValSerializer.close();
+      uncompressedValSerializer.open(valBuffer);
     }
     
     /** Workhorse to check and write out compressed data/lengths */
@@ -1295,7 +1300,8 @@ public class SequenceFile {
     }
 
     /** Append a key/value pair. */
-    public synchronized void append(Writable key, Writable val)
+    @SuppressWarnings("unchecked")
+    public synchronized void append(Object key, Object val)
       throws IOException {
       if (key.getClass() != keyClass)
         throw new IOException("wrong key class: "+key+" is not "+keyClass);
@@ -1304,14 +1310,14 @@ public class SequenceFile {
 
       // Save key/value into respective buffers 
       int oldKeyLength = keyBuffer.getLength();
-      key.write(keyBuffer);
+      keySerializer.serialize(key);
       int keyLength = keyBuffer.getLength() - oldKeyLength;
       if (keyLength == 0)
         throw new IOException("zero length keys not allowed: " + key);
       WritableUtils.writeVInt(keyLenBuffer, keyLength);
 
       int oldValLength = valBuffer.getLength();
-      val.write(valBuffer);
+      uncompressedValSerializer.serialize(val);
       int valLength = valBuffer.getLength() - oldValLength;
       WritableUtils.writeVInt(valLenBuffer, valLength);
       

+ 12 - 12
src/test/org/apache/hadoop/mapred/TestMapRed.java

@@ -24,6 +24,7 @@ import junit.framework.TestCase;
 import java.io.*;
 import java.util.*;
 
+import static org.apache.hadoop.io.SequenceFile.CompressionType;
 
 /**********************************************************
  * MapredLoadTest generates a bunch of work that exercises
@@ -282,8 +283,8 @@ public class TestMapRed extends TestCase {
       
   }
     
-  private void checkCompression(boolean compressMapOutput,
-                                boolean compressReduceOutput,
+  private void checkCompression(CompressionType mapCompression,
+                                CompressionType redCompression,
                                 boolean includeCombine
                                 ) throws Exception {
     JobConf conf = new JobConf(TestMapRed.class);
@@ -302,12 +303,9 @@ public class TestMapRed extends TestCase {
     if (includeCombine) {
       conf.setCombinerClass(IdentityReducer.class);
     }
-    if (compressMapOutput) {
-      conf.setCompressMapOutput(true);
-    }
-    if (compressReduceOutput) {
-      SequenceFileOutputFormat.setCompressOutput(conf, true);
-    }
+    conf.setMapOutputCompressionType(mapCompression);
+    conf.setCompressMapOutput(mapCompression != CompressionType.NONE);
+    SequenceFileOutputFormat.setOutputCompressionType(conf, redCompression);
     try {
       if (!fs.mkdirs(testdir)) {
         throw new IOException("Mkdirs failed to create " + testdir.toString());
@@ -330,7 +328,7 @@ public class TestMapRed extends TestCase {
       SequenceFile.Reader rdr = 
         new SequenceFile.Reader(fs, output, conf);
       assertEquals("is reduce output compressed " + output, 
-                   compressReduceOutput, 
+                   redCompression != CompressionType.NONE, 
                    rdr.isCompressed());
       rdr.close();
     } finally {
@@ -339,10 +337,12 @@ public class TestMapRed extends TestCase {
   }
     
   public void testCompression() throws Exception {
-    for(int compressMap=0; compressMap < 2; ++compressMap) {
-      for(int compressOut=0; compressOut < 2; ++compressOut) {
+    EnumSet<SequenceFile.CompressionType> seq =
+      EnumSet.allOf(SequenceFile.CompressionType.class);
+    for (CompressionType mapCompression : seq) {
+      for (CompressionType redCompression : seq) {
         for(int combine=0; combine < 2; ++combine) {
-          checkCompression(compressMap == 1, compressOut == 1,
+          checkCompression(mapCompression, redCompression,
                            combine == 1);
         }
       }