Browse Source

HADOOP-54. Add block compression to SequenceFile. Contributed by Arun.

git-svn-id: https://svn.apache.org/repos/asf/lucene/hadoop/trunk@437791 13f79535-47bb-0310-9956-ffa450edef68
Doug Cutting 18 years ago
parent
commit
1f90f632f4

+ 6 - 0
CHANGES.txt

@@ -74,6 +74,12 @@ Trunk (unreleased changes)
 
 18. HADOOP-461.  Make Java 1.5 an explicit requirement.  (cutting)
 
+19. HADOOP-54.  Add block compression to SequenceFile.  One may now
+    specify that blocks of keys and values are compressed together,
+    improving compression for small keys and values.
+    SequenceFile.Writer's constructor is now deprecated and replaced
+    with a factory method.  (Arun C Murthy via cutting)
+
 
 Release 0.5.0 - 2006-08-04
 

+ 31 - 0
conf/hadoop-default.xml

@@ -396,6 +396,37 @@ creations/deletions), or "all".</description>
   </description>
 </property>
 
+<property>
+  <name>mapred.seqfile.compress.blocksize</name>
+  <value>1000000</value>
+  <description>The minimum block size for compression in block compressed 
+  				SequenceFiles.
+  </description>
+</property>
+
+<property>
+  <name>mapred.seqfile.lazydecompress</name>
+  <value>true</value>
+  <description>Should values of block-compressed SequenceFiles be decompressed
+  				only when necessary.
+  </description>
+</property>
+
+<property>
+  <name>mapred.seqfile.sorter.recordlimit</name>
+  <value>1000000</value>
+  <description>The limit on number of records to be kept in memory in a spill 
+  				in SequenceFiles.Sorter
+  </description>
+</property>
+
+<property>
+  <name>mapred.seqfile.compression.type</name>
+  <value>NONE</value>
+  <description>The default compression type for SequenceFile.Writer.
+  </description>
+</property>
+
 <!-- ipc properties -->
 
 <property>

+ 4 - 2
src/examples/org/apache/hadoop/examples/NNBench.java

@@ -31,6 +31,7 @@ import org.apache.hadoop.io.IntWritable;
 import org.apache.hadoop.io.SequenceFile;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.io.SequenceFile.CompressionType;
 import org.apache.hadoop.mapred.ClusterStatus;
 import org.apache.hadoop.mapred.JobClient;
 import org.apache.hadoop.mapred.JobConf;
@@ -184,9 +185,10 @@ public class NNBench extends MapReduceBase implements Reducer {
 
     for(int i=0; i < numMaps; ++i) {
       Path file = new Path(inDir, "part"+i);
-      SequenceFile.Writer writer = new SequenceFile.Writer(fileSys,
-                                file,
+      SequenceFile.Writer writer = SequenceFile.createWriter(fileSys,
+                                jobConf, file,
                                 IntWritable.class, IntWritable.class,
+                                CompressionType.NONE,
                                 null);
       writer.append(new IntWritable(0), new IntWritable(filesPerMap));
       writer.close();

+ 6 - 4
src/examples/org/apache/hadoop/examples/PiBenchmark.java

@@ -26,6 +26,7 @@ import org.apache.hadoop.io.LongWritable;
 import org.apache.hadoop.io.SequenceFile;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.io.SequenceFile.CompressionType;
 import org.apache.hadoop.mapred.*;
 
 /**
@@ -125,8 +126,9 @@ public class PiBenchmark {
         Path outDir = new Path(tmpDir, "out");
         Path outFile = new Path(outDir, "reduce-out");
         FileSystem fileSys = FileSystem.get(conf);
-        SequenceFile.Writer writer = new SequenceFile.Writer(fileSys, outFile,
-              LongWritable.class, LongWritable.class);
+        SequenceFile.Writer writer = SequenceFile.createWriter(fileSys, conf, 
+            outFile, LongWritable.class, LongWritable.class, 
+            CompressionType.NONE);
         writer.append(new LongWritable(numInside), new LongWritable(numOutside));
         writer.close();
       }
@@ -173,8 +175,8 @@ public class PiBenchmark {
     
     for(int idx=0; idx < numMaps; ++idx) {
       Path file = new Path(inDir, "part"+idx);
-      SequenceFile.Writer writer = new SequenceFile.Writer(fileSys, file,
-              LongWritable.class, LongWritable.class);
+      SequenceFile.Writer writer = SequenceFile.createWriter(fileSys, jobConf, 
+          file, LongWritable.class, LongWritable.class, CompressionType.NONE);
       writer.append(new LongWritable(numPoints), new LongWritable(0));
       writer.close();
       System.out.println("Wrote input for Map #"+idx);

+ 5 - 2
src/examples/org/apache/hadoop/examples/RandomWriter.java

@@ -31,6 +31,7 @@ import org.apache.hadoop.io.SequenceFile;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.io.SequenceFile.CompressionType;
 import org.apache.hadoop.mapred.ClusterStatus;
 import org.apache.hadoop.mapred.JobClient;
 import org.apache.hadoop.mapred.JobConf;
@@ -51,6 +52,7 @@ public class RandomWriter extends MapReduceBase implements Reducer {
   
   public static class Map extends MapReduceBase implements Mapper {
     private FileSystem fileSys = null;
+    private JobConf jobConf = null;
     private long numBytesToWrite;
     private int minKeySize;
     private int keySizeRange;
@@ -75,9 +77,9 @@ public class RandomWriter extends MapReduceBase implements Reducer {
                     Reporter reporter) throws IOException {
       String filename = ((Text) value).toString();
       SequenceFile.Writer writer = 
-        new SequenceFile.Writer(fileSys, new Path(filename), 
+        SequenceFile.createWriter(fileSys, jobConf, new Path(filename), 
                                 BytesWritable.class, BytesWritable.class,
-                                reporter);
+                                CompressionType.NONE, reporter);
       int itemCount = 0;
       while (numBytesToWrite > 0) {
         int keyLength = minKeySize + 
@@ -104,6 +106,7 @@ public class RandomWriter extends MapReduceBase implements Reducer {
      * the data.
      */
     public void configure(JobConf job) {
+      jobConf = job;
       try {
         fileSys = FileSystem.get(job);
       } catch (IOException e) {

File diff suppressed because it is too large
+ 855 - 111
src/java/org/apache/hadoop/io/SequenceFile.java


+ 10 - 3
src/java/org/apache/hadoop/mapred/SequenceFileOutputFormat.java

@@ -26,6 +26,7 @@ import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.SequenceFile;
 import org.apache.hadoop.io.WritableComparable;
 import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.SequenceFile.CompressionType;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.util.Progressable;
 
@@ -38,11 +39,17 @@ public class SequenceFileOutputFormat extends OutputFormatBase {
 
     Path file = new Path(job.getOutputPath(), name);
 
-    final SequenceFile.Writer out =
-      new SequenceFile.Writer(fs, file,
+    /** TODO: Figure out a way to deprecate 'mapred.output.compress' */
+    final SequenceFile.Writer out = 
+      SequenceFile.createWriter(fs, job, file,
                               job.getOutputKeyClass(),
                               job.getOutputValueClass(),
-                              job.getBoolean("mapred.output.compress", false),
+                              job.getBoolean("mapred.output.compress", false) ? 
+                                  CompressionType.RECORD : 
+                                  CompressionType.valueOf(
+                                    job.get("mapred.seqfile.compression.type", 
+                                        "NONE")
+                                  ),
                               progress);
 
     return new RecordWriter() {

+ 4 - 2
src/test/org/apache/hadoop/fs/DFSCIOTest.java

@@ -26,6 +26,7 @@ import org.apache.commons.logging.*;
 
 import org.apache.hadoop.mapred.*;
 import org.apache.hadoop.io.*;
+import org.apache.hadoop.io.SequenceFile.CompressionType;
 import org.apache.hadoop.conf.*;
 
 /**
@@ -122,8 +123,9 @@ public class DFSCIOTest extends TestCase {
       Path controlFile = new Path(CONTROL_DIR, "in_file_" + name);
       SequenceFile.Writer writer = null;
       try {
-        writer = new SequenceFile.Writer(fs, controlFile,
-                                         UTF8.class, LongWritable.class);
+        writer = SequenceFile.createWriter(fs, fsConfig, controlFile,
+                                         UTF8.class, LongWritable.class,
+                                         CompressionType.NONE);
         writer.append(new UTF8(name), new LongWritable(fileSize));
       } catch(Exception e) {
         throw new IOException(e.getLocalizedMessage());

+ 3 - 1
src/test/org/apache/hadoop/fs/DistributedFSCheck.java

@@ -28,6 +28,7 @@ import org.apache.commons.logging.*;
 
 import org.apache.hadoop.mapred.*;
 import org.apache.hadoop.io.*;
+import org.apache.hadoop.io.SequenceFile.CompressionType;
 import org.apache.hadoop.conf.*;
 
 /**
@@ -89,7 +90,8 @@ public class DistributedFSCheck extends TestCase {
 
     Path inputFile = new Path(MAP_INPUT_DIR, "in_file");
     SequenceFile.Writer writer =
-      new SequenceFile.Writer(fs, inputFile, UTF8.class, LongWritable.class);
+      SequenceFile.createWriter(fs, fsConfig, inputFile, 
+          UTF8.class, LongWritable.class, CompressionType.NONE);
     
     try {
       nrFiles = 0;

+ 4 - 2
src/test/org/apache/hadoop/fs/TestDFSIO.java

@@ -26,6 +26,7 @@ import org.apache.commons.logging.*;
 
 import org.apache.hadoop.mapred.*;
 import org.apache.hadoop.io.*;
+import org.apache.hadoop.io.SequenceFile.CompressionType;
 import org.apache.hadoop.conf.*;
 
 /**
@@ -116,8 +117,9 @@ public class TestDFSIO extends TestCase {
       Path controlFile = new Path(CONTROL_DIR, "in_file_" + name);
       SequenceFile.Writer writer = null;
       try {
-        writer = new SequenceFile.Writer(fs, controlFile,
-                                         UTF8.class, LongWritable.class);
+        writer = SequenceFile.createWriter(fs, fsConfig, controlFile,
+                                         UTF8.class, LongWritable.class,
+                                         CompressionType.NONE);
         writer.append(new UTF8(name), new LongWritable(fileSize));
       } catch(Exception e) {
         throw new IOException(e.getLocalizedMessage());

+ 3 - 1
src/test/org/apache/hadoop/fs/TestFileSystem.java

@@ -25,6 +25,7 @@ import org.apache.commons.logging.*;
 import org.apache.hadoop.mapred.*;
 import org.apache.hadoop.mapred.lib.*;
 import org.apache.hadoop.io.*;
+import org.apache.hadoop.io.SequenceFile.CompressionType;
 import org.apache.hadoop.conf.*;
 
 public class TestFileSystem extends TestCase {
@@ -77,7 +78,8 @@ public class TestFileSystem extends TestCase {
     Random random = new Random(seed);
 
     SequenceFile.Writer writer =
-      new SequenceFile.Writer(fs, controlFile, UTF8.class, LongWritable.class);
+      SequenceFile.createWriter(fs, conf, controlFile, 
+          UTF8.class, LongWritable.class, CompressionType.NONE);
 
     long totalSize = 0;
     long maxSize = ((megaBytes / numFiles) * 2) + 1;

+ 119 - 39
src/test/org/apache/hadoop/io/TestSequenceFile.java

@@ -23,6 +23,7 @@ import junit.framework.TestCase;
 import org.apache.commons.logging.*;
 
 import org.apache.hadoop.fs.*;
+import org.apache.hadoop.io.SequenceFile.CompressionType;
 import org.apache.hadoop.conf.*;
 
 
@@ -40,13 +41,19 @@ public class TestSequenceFile extends TestCase {
     int megabytes = 1;
     int factor = 5;
     Path file = new Path(System.getProperty("test.build.data",".")+"/test.seq");
+    Path recordCompressedFile = 
+      new Path(System.getProperty("test.build.data",".")+"/test.rc.seq");
+    Path blockCompressedFile = 
+      new Path(System.getProperty("test.build.data",".")+"/test.bc.seq");
  
     int seed = new Random().nextInt();
 
-    FileSystem fs = new LocalFileSystem(new Configuration());
+    FileSystem fs = new LocalFileSystem(conf);
     try {
         //LOG.setLevel(Level.FINE);
-        writeTest(fs, count, seed, file, false);
+
+        // SequenceFile.Writer
+        writeTest(fs, count, seed, file, CompressionType.NONE);
         readTest(fs, count, seed, file);
 
         sortTest(fs, count, megabytes, factor, false, file);
@@ -55,24 +62,63 @@ public class TestSequenceFile extends TestCase {
         sortTest(fs, count, megabytes, factor, true, file);
         checkSort(fs, count, seed, file);
 
-        mergeTest(fs, count, seed, file, false, factor, megabytes);
+        mergeTest(fs, count, seed, file, CompressionType.NONE, false, 
+            factor, megabytes);
         checkSort(fs, count, seed, file);
 
-        mergeTest(fs, count, seed, file, true, factor, megabytes);
+        mergeTest(fs, count, seed, file, CompressionType.NONE, true, 
+            factor, megabytes);
         checkSort(fs, count, seed, file);
-    } finally {
+        
+        // SequenceFile.RecordCompressWriter
+        writeTest(fs, count, seed, recordCompressedFile, CompressionType.RECORD);
+        readTest(fs, count, seed, recordCompressedFile);
+
+        sortTest(fs, count, megabytes, factor, false, recordCompressedFile);
+        checkSort(fs, count, seed, recordCompressedFile);
+
+        sortTest(fs, count, megabytes, factor, true, recordCompressedFile);
+        checkSort(fs, count, seed, recordCompressedFile);
+
+        mergeTest(fs, count, seed, recordCompressedFile, 
+            CompressionType.RECORD, false, factor, megabytes);
+        checkSort(fs, count, seed, recordCompressedFile);
+
+        mergeTest(fs, count, seed, recordCompressedFile, 
+            CompressionType.RECORD, true, factor, megabytes);
+        checkSort(fs, count, seed, recordCompressedFile);
+        
+        // SequenceFile.BlockCompressWriter
+        writeTest(fs, count, seed, blockCompressedFile, CompressionType.BLOCK);
+        readTest(fs, count, seed, blockCompressedFile);
+
+        sortTest(fs, count, megabytes, factor, false, blockCompressedFile);
+        checkSort(fs, count, seed, blockCompressedFile);
+
+        sortTest(fs, count, megabytes, factor, true, blockCompressedFile);
+        checkSort(fs, count, seed, blockCompressedFile);
+
+        mergeTest(fs, count, seed, blockCompressedFile, CompressionType.BLOCK, 
+            false, factor, megabytes);
+        checkSort(fs, count, seed, blockCompressedFile);
+
+        mergeTest(fs, count, seed, blockCompressedFile, CompressionType.BLOCK, 
+            true, factor, megabytes);
+        checkSort(fs, count, seed, blockCompressedFile);
+
+        } finally {
         fs.close();
     }
   }
 
-  private static void writeTest(FileSystem fs, int count, int seed,
-                                Path file, boolean compress)
+  private static void writeTest(FileSystem fs, int count, int seed, Path file, 
+      CompressionType compressionType)
     throws IOException {
     fs.delete(file);
     LOG.debug("creating with " + count + " records");
-    SequenceFile.Writer writer =
-      new SequenceFile.Writer(fs, file, RandomDatum.class, RandomDatum.class,
-                              compress);
+    SequenceFile.Writer writer = 
+      SequenceFile.createWriter(fs, conf, file, 
+          RandomDatum.class, RandomDatum.class, compressionType);
     RandomDatum.Generator generator = new RandomDatum.Generator(seed);
     for (int i = 0; i < count; i++) {
       generator.next();
@@ -86,22 +132,39 @@ public class TestSequenceFile extends TestCase {
 
   private static void readTest(FileSystem fs, int count, int seed, Path file)
     throws IOException {
-    RandomDatum k = new RandomDatum();
-    RandomDatum v = new RandomDatum();
     LOG.debug("reading " + count + " records");
     SequenceFile.Reader reader = new SequenceFile.Reader(fs, file, conf);
     RandomDatum.Generator generator = new RandomDatum.Generator(seed);
+
+    RandomDatum k = new RandomDatum();
+    RandomDatum v = new RandomDatum();
+    DataOutputBuffer rawKey = new DataOutputBuffer();
+    SequenceFile.ValueBytes rawValue = reader.createValueBytes();
+    
     for (int i = 0; i < count; i++) {
       generator.next();
       RandomDatum key = generator.getKey();
       RandomDatum value = generator.getValue();
-      
-      reader.next(k, v);
-      
-      if (!k.equals(key))
-        throw new RuntimeException("wrong key at " + i);
-      if (!v.equals(value))
-        throw new RuntimeException("wrong value at " + i);
+
+      if ((i%5) == 10) {
+        // Testing 'raw' apis
+        rawKey.reset();
+        reader.nextRaw(rawKey, rawValue);
+      } else {
+        // Testing 'non-raw' apis 
+        if ((i%2) == 0) {
+          reader.next(k);
+          reader.getCurrentValue(v);
+        } else {
+          reader.next(k, v);
+        }
+
+        // Sanity check
+        if (!k.equals(key))
+          throw new RuntimeException("wrong key at " + i);
+        if (!v.equals(value))
+          throw new RuntimeException("wrong value at " + i);
+      }
     }
     reader.close();
   }
@@ -152,9 +215,9 @@ public class TestSequenceFile extends TestCase {
     LOG.debug("sucessfully checked " + count + " records");
   }
 
-  private static void mergeTest(FileSystem fs, int count, int seed, 
-                                Path file, boolean fast, int factor, 
-                                int megabytes)
+  private static void mergeTest(FileSystem fs, int count, int seed, Path file, 
+                                CompressionType compressionType,
+                                boolean fast, int factor, int megabytes)
     throws IOException {
 
     LOG.debug("creating "+factor+" files with "+count/factor+" records");
@@ -168,8 +231,8 @@ public class TestSequenceFile extends TestCase {
       sortedNames[i] = names[i].suffix(".sorted");
       fs.delete(names[i]);
       fs.delete(sortedNames[i]);
-      writers[i] =
-        new SequenceFile.Writer(fs, names[i], RandomDatum.class,RandomDatum.class);
+      writers[i] = SequenceFile.createWriter(fs, conf, names[i], 
+          RandomDatum.class, RandomDatum.class, compressionType);
     }
 
     RandomDatum.Generator generator = new RandomDatum.Generator(seed);
@@ -215,21 +278,25 @@ public class TestSequenceFile extends TestCase {
     int megabytes = 1;
     int factor = 10;
     boolean create = true;
+    boolean rwonly = false;
     boolean check = false;
     boolean fast = false;
     boolean merge = false;
-    boolean compress = false;
+    String compressType = "NONE";
     Path file = null;
-    String usage = "Usage: SequenceFile (-local | -dfs <namenode:port>) [-count N] [-megabytes M] [-factor F] [-nocreate] [-check] [-fast] [-merge] [-compress] file";
-    
+
+    String usage = "Usage: SequenceFile (-local | -dfs <namenode:port>) " +
+        "[-count N] " + "[-check] [-compressType <NONE|RECORD|BLOCK>] " +
+        "[[-rwonly] | {[-megabytes M] [-factor F] [-nocreate] [-fast] [-merge]}] " +
+        " file";
     if (args.length == 0) {
         System.err.println(usage);
         System.exit(-1);
     }
-    int i = 0;
-    FileSystem fs = FileSystem.parseArgs(args, i, conf);      
+    
+    FileSystem fs = FileSystem.parseArgs(args, 0, conf);      
     try {
-      for (; i < args.length; i++) {       // parse command line
+      for (int i=0; i < args.length; ++i) {       // parse command line
           if (args[i] == null) {
               continue;
           } else if (args[i].equals("-count")) {
@@ -238,6 +305,8 @@ public class TestSequenceFile extends TestCase {
               megabytes = Integer.parseInt(args[++i]);
           } else if (args[i].equals("-factor")) {
               factor = Integer.parseInt(args[++i]);
+          } else if (args[i].equals("-rwonly")) {
+              rwonly = true;
           } else if (args[i].equals("-nocreate")) {
               create = false;
           } else if (args[i].equals("-check")) {
@@ -246,8 +315,8 @@ public class TestSequenceFile extends TestCase {
               fast = true;
           } else if (args[i].equals("-merge")) {
               merge = true;
-          } else if (args[i].equals("-compress")) {
-              compress = true;
+          } else if (args[i].equals("-compressType")) {
+              compressType = args[++i];
           } else {
               // file is required parameter
               file = new Path(args[i]);
@@ -257,23 +326,34 @@ public class TestSequenceFile extends TestCase {
         LOG.info("megabytes = " + megabytes);
         LOG.info("factor = " + factor);
         LOG.info("create = " + create);
+        LOG.info("rwonly = " + rwonly);
         LOG.info("check = " + check);
         LOG.info("fast = " + fast);
         LOG.info("merge = " + merge);
-        LOG.info("compress = " + compress);
+        LOG.info("compressType = " + compressType);
         LOG.info("file = " + file);
 
+        if (rwonly && (!create || merge || fast)) {
+          System.err.println(usage);
+          System.exit(-1);
+        }
+
         int seed = 0;
- 
-        if (create && !merge) {
-            writeTest(fs, count, seed, file, compress);
+        CompressionType compressionType = 
+          CompressionType.valueOf(compressType);
+
+        if (rwonly || (create && !merge)) {
+            writeTest(fs, count, seed, file, compressionType);
             readTest(fs, count, seed, file);
         }
 
-        if (merge) {
-            mergeTest(fs, count, seed, file, fast, factor, megabytes);
-        } else {
+        if (!rwonly) {
+          if (merge) {
+            mergeTest(fs, count, seed, file, compressionType, 
+                fast, factor, megabytes);
+          } else {
             sortTest(fs, count, megabytes, factor, fast, file);
+          }
         }
     
         if (check) {

+ 6 - 4
src/test/org/apache/hadoop/mapred/PiEstimator.java

@@ -26,6 +26,7 @@ import org.apache.hadoop.io.IntWritable;
 import org.apache.hadoop.io.SequenceFile;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.io.SequenceFile.CompressionType;
 
 /**
  * A Map-reduce program to estimaate the valu eof Pi using monte-carlo
@@ -119,8 +120,9 @@ public class PiEstimator {
         Path outDir = new Path(tmpDir, "out");
         Path outFile = new Path(outDir, "reduce-out");
         FileSystem fileSys = FileSystem.get(conf);
-        SequenceFile.Writer writer = new SequenceFile.Writer(fileSys, outFile,
-              IntWritable.class, IntWritable.class);
+        SequenceFile.Writer writer = SequenceFile.createWriter(fileSys, conf,
+            outFile, IntWritable.class, IntWritable.class, 
+            CompressionType.NONE);
         writer.append(new IntWritable(numInside), new IntWritable(numOutside));
         writer.close();
       }
@@ -169,8 +171,8 @@ public class PiEstimator {
     
     for(int idx=0; idx < numMaps; ++idx) {
       Path file = new Path(inDir, "part"+idx);
-      SequenceFile.Writer writer = new SequenceFile.Writer(fileSys, file,
-              IntWritable.class, IntWritable.class);
+      SequenceFile.Writer writer = SequenceFile.createWriter(fileSys, jobConf, 
+          file, IntWritable.class, IntWritable.class, CompressionType.NONE);
       writer.append(new IntWritable(numPoints), new IntWritable(0));
       writer.close();
     }

+ 4 - 1
src/test/org/apache/hadoop/record/test/TestMapRed.java

@@ -19,6 +19,7 @@ package org.apache.hadoop.record.test;
 import org.apache.hadoop.mapred.*;
 import org.apache.hadoop.fs.*;
 import org.apache.hadoop.io.*;
+import org.apache.hadoop.io.SequenceFile.CompressionType;
 import org.apache.hadoop.conf.*;
 import junit.framework.TestCase;
 import java.io.*;
@@ -251,7 +252,9 @@ public class TestMapRed extends TestCase {
         fs.mkdirs(randomIns);
 
         File answerkey = new File(randomIns, "answer.key");
-        SequenceFile.Writer out = new SequenceFile.Writer(fs, answerkey.getPath(), RecInt.class, RecInt.class);
+        SequenceFile.Writer out = SequenceFile.createWriter(fs, conf, 
+            new Path(answerkey.getPath()), RecInt.class, RecInt.class, 
+            CompressionType.NONE);
         try {
             for (int i = 0; i < range; i++) {
                 RecInt k = new RecInt();

Some files were not shown because too many files changed in this diff