浏览代码

HADOOP-522. Permit block compression with MapFile and SetFile.

git-svn-id: https://svn.apache.org/repos/asf/lucene/hadoop/trunk@452624 13f79535-47bb-0310-9956-ffa450edef68
Doug Cutting 18 年之前
父节点
当前提交
53b7ecdcf3

+ 4 - 0
CHANGES.txt

@@ -112,6 +112,10 @@ Trunk (unreleased changes)
     stream. Implement an optimized version for DFS and local FS.
     stream. Implement an optimized version for DFS and local FS.
     (Milind Bhandarkar via cutting)
     (Milind Bhandarkar via cutting)
 
 
+28. HADOOP-522. Permit block compression with MapFile and SetFile.
+    Since these formats are always sorted, block compression can
+    provide a big advantage.  (cutting)
+
 
 
 Release 0.6.2 - 2006-09-18
 Release 0.6.2 - 2006-09-18
 
 

+ 35 - 17
src/java/org/apache/hadoop/io/MapFile.java

@@ -19,6 +19,7 @@ package org.apache.hadoop.io;
 import java.io.*;
 import java.io.*;
 import org.apache.hadoop.fs.*;
 import org.apache.hadoop.fs.*;
 import org.apache.hadoop.conf.*;
 import org.apache.hadoop.conf.*;
+import org.apache.hadoop.io.SequenceFile.CompressionType;
 
 
 /** A file-based map from keys to values.
 /** A file-based map from keys to values.
  * 
  * 
@@ -69,12 +70,20 @@ public class MapFile {
       this(fs, dirName, WritableComparator.get(keyClass), valClass, false);
       this(fs, dirName, WritableComparator.get(keyClass), valClass, false);
     }
     }
 
 
-    /** Create the named map for keys of the named class. */
+    /** Create the named map for keys of the named class.
+     * @deprecated specify a {@link CompressionType} instead
+     */
     public Writer(FileSystem fs, String dirName,
     public Writer(FileSystem fs, String dirName,
                   Class keyClass, Class valClass, boolean compress)
                   Class keyClass, Class valClass, boolean compress)
       throws IOException {
       throws IOException {
       this(fs, dirName, WritableComparator.get(keyClass), valClass, compress);
       this(fs, dirName, WritableComparator.get(keyClass), valClass, compress);
     }
     }
+    /** Create the named map for keys of the named class. */
+    public Writer(Configuration conf, FileSystem fs, String dirName,
+                  Class keyClass, Class valClass, CompressionType compress)
+      throws IOException {
+      this(conf,fs,dirName,WritableComparator.get(keyClass),valClass,compress);
+    }
 
 
     /** Create the named map using the named key comparator. */
     /** Create the named map using the named key comparator. */
     public Writer(FileSystem fs, String dirName,
     public Writer(FileSystem fs, String dirName,
@@ -82,12 +91,24 @@ public class MapFile {
       throws IOException {
       throws IOException {
       this(fs, dirName, comparator, valClass, false);
       this(fs, dirName, comparator, valClass, false);
     }
     }
-    /** Create the named map using the named key comparator. */
+    /** Create the named map using the named key comparator.
+     * @deprecated specify a {@link CompressionType} instead
+     */
     public Writer(FileSystem fs, String dirName,
     public Writer(FileSystem fs, String dirName,
                   WritableComparator comparator, Class valClass,
                   WritableComparator comparator, Class valClass,
                   boolean compress)
                   boolean compress)
       throws IOException {
       throws IOException {
 
 
+      this(new Configuration(), fs, dirName, comparator, valClass,
+           compress ? CompressionType.RECORD : CompressionType.NONE);
+    }
+
+    /** Create the named map using the named key comparator. */
+    public Writer(Configuration conf, FileSystem fs, String dirName,
+                  WritableComparator comparator, Class valClass,
+                  SequenceFile.CompressionType compress)
+      throws IOException {
+
       this.comparator = comparator;
       this.comparator = comparator;
       this.lastKey = comparator.newKey();
       this.lastKey = comparator.newKey();
 
 
@@ -99,9 +120,11 @@ public class MapFile {
 
 
       Class keyClass = comparator.getKeyClass();
       Class keyClass = comparator.getKeyClass();
       this.data =
       this.data =
-        new SequenceFile.Writer(fs, dataFile, keyClass, valClass, compress);
+        SequenceFile.createWriter
+        (fs,conf,dataFile,keyClass,valClass,compress);
       this.index =
       this.index =
-        new SequenceFile.Writer(fs, indexFile, keyClass, LongWritable.class);
+        SequenceFile.createWriter
+        (fs,conf,indexFile,keyClass,LongWritable.class,CompressionType.BLOCK);
     }
     }
     
     
     /** The number of entries that are added before an index entry is added.*/
     /** The number of entries that are added before an index entry is added.*/
@@ -159,9 +182,7 @@ public class MapFile {
       
       
     private WritableComparator comparator;
     private WritableComparator comparator;
 
 
-    private DataOutputBuffer keyBuf = new DataOutputBuffer();
-    private DataOutputBuffer nextBuf = new DataOutputBuffer();
-    private int nextKeyLen = -1;
+    private WritableComparable nextKey;
     private long seekPosition = -1;
     private long seekPosition = -1;
     private int seekIndex = -1;
     private int seekIndex = -1;
     private long firstPosition;
     private long firstPosition;
@@ -300,14 +321,11 @@ public class MapFile {
     public synchronized boolean seek(WritableComparable key)
     public synchronized boolean seek(WritableComparable key)
       throws IOException {
       throws IOException {
       readIndex();                                // make sure index is read
       readIndex();                                // make sure index is read
-      keyBuf.reset();                             // write key to keyBuf
-      key.write(keyBuf);
 
 
       if (seekIndex != -1                         // seeked before
       if (seekIndex != -1                         // seeked before
           && seekIndex+1 < count           
           && seekIndex+1 < count           
           && comparator.compare(key,keys[seekIndex+1])<0 // before next indexed
           && comparator.compare(key,keys[seekIndex+1])<0 // before next indexed
-          && comparator.compare(keyBuf.getData(), 0, keyBuf.getLength(),
-                                nextBuf.getData(), 0, nextKeyLen)
+          && comparator.compare(key, nextKey)
           >= 0) {                                 // but after last seeked
           >= 0) {                                 // but after last seeked
         // do nothing
         // do nothing
       } else {
       } else {
@@ -322,14 +340,14 @@ public class MapFile {
       }
       }
       data.seek(seekPosition);
       data.seek(seekPosition);
       
       
-      while ((nextKeyLen = data.next(nextBuf.reset())) != -1) {
-        int c = comparator.compare(keyBuf.getData(), 0, keyBuf.getLength(),
-                                   nextBuf.getData(), 0, nextKeyLen);
+      if (nextKey == null)
+        nextKey = comparator.newKey();
+
+      while (data.next(nextKey)) {
+        int c = comparator.compare(key, nextKey);
         if (c <= 0) {                             // at or beyond desired
         if (c <= 0) {                             // at or beyond desired
-          data.seek(seekPosition);                // back off to previous
           return c == 0;
           return c == 0;
         }
         }
-        seekPosition = data.getPosition();
       }
       }
 
 
       return false;
       return false;
@@ -366,7 +384,7 @@ public class MapFile {
     public synchronized Writable get(WritableComparable key, Writable val)
     public synchronized Writable get(WritableComparable key, Writable val)
       throws IOException {
       throws IOException {
       if (seek(key)) {
       if (seek(key)) {
-        next(getKey, val);                        // don't smash key
+        data.getCurrentValue(val);
         return val;
         return val;
       } else
       } else
         return null;
         return null;

+ 36 - 21
src/java/org/apache/hadoop/io/SequenceFile.java

@@ -592,7 +592,15 @@ public class SequenceFile {
       val.writeUncompressedBytes(out);            // value
       val.writeUncompressedBytes(out);            // value
     }
     }
 
 
-    /** Returns the current length of the output file. */
+    /** Returns the current length of the output file.
+     *
+     * <p>This always returns a synchronized position.  In other words, {@link
+     * immediately after calling {@link Reader#seek(long)} with a position
+     * returned by this method, Reader#next(Writable) may be called.  However
+     * the key may be earlier in the file than key last written when this
+     * method was called (e.g., with block-compression, it may be the first key
+     * in the block that was being written when this method was called).
+     */
     public synchronized long getLength() throws IOException {
     public synchronized long getLength() throws IOException {
       return out.getPos();
       return out.getPos();
     }
     }
@@ -1023,13 +1031,13 @@ public class SequenceFile {
         valLenBuffer = new DataInputBuffer();
         valLenBuffer = new DataInputBuffer();
         
         
         keyLenInFilter = this.codec.createInputStream(keyLenBuffer);
         keyLenInFilter = this.codec.createInputStream(keyLenBuffer);
-        keyLenIn = new DataInputStream(new BufferedInputStream(keyLenInFilter));
+        keyLenIn = new DataInputStream(keyLenInFilter);
 
 
         keyInFilter = this.codec.createInputStream(keyBuffer);
         keyInFilter = this.codec.createInputStream(keyBuffer);
-        keyIn = new DataInputStream(new BufferedInputStream(keyInFilter));
+        keyIn = new DataInputStream(keyInFilter);
 
 
         valLenInFilter = this.codec.createInputStream(valLenBuffer);
         valLenInFilter = this.codec.createInputStream(valLenBuffer);
-        valLenIn = new DataInputStream(new BufferedInputStream(valLenInFilter));
+        valLenIn = new DataInputStream(valLenInFilter);
       }
       }
       
       
 
 
@@ -1058,19 +1066,17 @@ public class SequenceFile {
 
 
     /** Read a compressed buffer */
     /** Read a compressed buffer */
     private synchronized void readBuffer(DataInputBuffer buffer, 
     private synchronized void readBuffer(DataInputBuffer buffer, 
-        CompressionInputStream filter, boolean castAway) throws IOException {
+        CompressionInputStream filter) throws IOException {
       // Read data into a temporary buffer
       // Read data into a temporary buffer
       DataOutputBuffer dataBuffer = new DataOutputBuffer();
       DataOutputBuffer dataBuffer = new DataOutputBuffer();
       int dataBufferLength = WritableUtils.readVInt(in);
       int dataBufferLength = WritableUtils.readVInt(in);
       dataBuffer.write(in, dataBufferLength);
       dataBuffer.write(in, dataBufferLength);
       
       
-      if (false == castAway) {
-        // Reset the codec
-        filter.resetState();
-        
-        // Set up 'buffer' connected to the input-stream
-        buffer.reset(dataBuffer.getData(), 0, dataBuffer.getLength());
-      }
+      // Set up 'buffer' connected to the input-stream
+      buffer.reset(dataBuffer.getData(), 0, dataBuffer.getLength());
+
+      // Reset the codec
+      filter.resetState();
     }
     }
     
     
     /** Read the next 'compressed' block */
     /** Read the next 'compressed' block */
@@ -1078,8 +1084,8 @@ public class SequenceFile {
       // Check if we need to throw away a whole block of 
       // Check if we need to throw away a whole block of 
       // 'values' due to 'lazy decompression' 
       // 'values' due to 'lazy decompression' 
       if (lazyDecompress && !valuesDecompressed) {
       if (lazyDecompress && !valuesDecompressed) {
-        readBuffer(null, null, true);
-        readBuffer(null, null, true);
+        in.seek(WritableUtils.readVInt(in)+in.getPos());
+        in.seek(WritableUtils.readVInt(in)+in.getPos());
       }
       }
       
       
       // Reset internal states
       // Reset internal states
@@ -1099,14 +1105,14 @@ public class SequenceFile {
       noBufferedRecords = WritableUtils.readVInt(in);
       noBufferedRecords = WritableUtils.readVInt(in);
       
       
       // Read key lengths and keys
       // Read key lengths and keys
-      readBuffer(keyLenBuffer, keyLenInFilter, false);
-      readBuffer(keyBuffer, keyInFilter, false);
+      readBuffer(keyLenBuffer, keyLenInFilter);
+      readBuffer(keyBuffer, keyInFilter);
       noBufferedKeys = noBufferedRecords;
       noBufferedKeys = noBufferedRecords;
       
       
       // Read value lengths and values
       // Read value lengths and values
       if (!lazyDecompress) {
       if (!lazyDecompress) {
-        readBuffer(valLenBuffer, valLenInFilter, false);
-        readBuffer(valBuffer, valInFilter, false);
+        readBuffer(valLenBuffer, valLenInFilter);
+        readBuffer(valBuffer, valInFilter);
         noBufferedValues = noBufferedRecords;
         noBufferedValues = noBufferedRecords;
         valuesDecompressed = true;
         valuesDecompressed = true;
       }
       }
@@ -1126,8 +1132,8 @@ public class SequenceFile {
         // Check if this is the first value in the 'block' to be read
         // Check if this is the first value in the 'block' to be read
         if (lazyDecompress && !valuesDecompressed) {
         if (lazyDecompress && !valuesDecompressed) {
           // Read the value lengths and values
           // Read the value lengths and values
-          readBuffer(valLenBuffer, valLenInFilter, false);
-          readBuffer(valBuffer, valInFilter, false);
+          readBuffer(valLenBuffer, valLenInFilter);
+          readBuffer(valBuffer, valInFilter);
           noBufferedValues = noBufferedRecords;
           noBufferedValues = noBufferedRecords;
           valuesDecompressed = true;
           valuesDecompressed = true;
         }
         }
@@ -1377,9 +1383,18 @@ public class SequenceFile {
       }
       }
     }
     }
 
 
-    /** Set the current byte position in the input file. */
+    /** Set the current byte position in the input file.
+     *
+     * <p>The position passed must be a position returned by {@link
+     * Writer#getLength()} when writing this file.  To seek to an arbitrary
+     * position, use {@link Reader#sync(long)}.
+     */
     public synchronized void seek(long position) throws IOException {
     public synchronized void seek(long position) throws IOException {
       in.seek(position);
       in.seek(position);
+      if (blockCompressed) {                      // trigger block read
+        noBufferedKeys = 0;
+        valuesDecompressed = true;
+      }
     }
     }
 
 
     /** Seek to the next sync mark past a given position.*/
     /** Seek to the next sync mark past a given position.*/

+ 15 - 0
src/java/org/apache/hadoop/io/SetFile.java

@@ -20,6 +20,7 @@ import java.io.*;
 
 
 import org.apache.hadoop.fs.*;
 import org.apache.hadoop.fs.*;
 import org.apache.hadoop.conf.*;
 import org.apache.hadoop.conf.*;
+import org.apache.hadoop.io.SequenceFile.CompressionType;
 
 
 /** A file-based set of keys. */
 /** A file-based set of keys. */
 public class SetFile extends MapFile {
 public class SetFile extends MapFile {
@@ -40,6 +41,20 @@ public class SetFile extends MapFile {
       super(fs, dirName, comparator, NullWritable.class);
       super(fs, dirName, comparator, NullWritable.class);
     }
     }
 
 
+    /** Create a set naming the element class and compression type. */
+    public Writer(Configuration conf, FileSystem fs, String dirName,
+                  Class keyClass, SequenceFile.CompressionType compress)
+      throws IOException {
+      this(conf, fs, dirName, WritableComparator.get(keyClass), compress);
+    }
+
+    /** Create a set naming the element comparator and compression type. */
+    public Writer(Configuration conf, FileSystem fs, String dirName,
+                  WritableComparator comparator,
+                  SequenceFile.CompressionType compress) throws IOException {
+      super(conf, fs, dirName, comparator, NullWritable.class, compress);
+    }
+
     /** Append a key to a set.  The key must be strictly greater than the
     /** Append a key to a set.  The key must be strictly greater than the
      * previous key added to the set. */
      * previous key added to the set. */
     public void append(WritableComparable key) throws IOException{
     public void append(WritableComparable key) throws IOException{

+ 9 - 4
src/java/org/apache/hadoop/io/compress/CompressionInputStream.java

@@ -20,8 +20,13 @@ import java.io.IOException;
 import java.io.InputStream;
 import java.io.InputStream;
 
 
 /**
 /**
-* A compression input stream.
-  * @author Arun C Murthy
+ * A compression input stream.
+ *
+ * <p>Implementations are assumed to be buffered.  This permits clients to
+ * reposition the underlying input stream then call {@link #resetState()},
+ * without having to also synchronize client buffers.
+ *
+ * @author Arun C Murthy
  */
  */
 public abstract class CompressionInputStream extends InputStream {
 public abstract class CompressionInputStream extends InputStream {
   /**
   /**
@@ -49,8 +54,8 @@ public abstract class CompressionInputStream extends InputStream {
   public abstract int read(byte[] b, int off, int len) throws IOException;
   public abstract int read(byte[] b, int off, int len) throws IOException;
 
 
   /**
   /**
-   * Reset the compression to the initial state. Does not reset the underlying
-   * stream.
+   * Reset the decompressor to its initial state and discard any buffered data,
+   * as the underlying stream may have been repositioned.
    */
    */
   public abstract void resetState() throws IOException;
   public abstract void resetState() throws IOException;
   
   

+ 3 - 2
src/java/org/apache/hadoop/mapred/MapFileOutputFormat.java

@@ -23,6 +23,7 @@ import java.util.Arrays;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.Path;
 
 
+import org.apache.hadoop.io.SequenceFile;
 import org.apache.hadoop.io.MapFile;
 import org.apache.hadoop.io.MapFile;
 import org.apache.hadoop.io.WritableComparable;
 import org.apache.hadoop.io.WritableComparable;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.Writable;
@@ -40,10 +41,10 @@ public class MapFileOutputFormat extends OutputFormatBase {
 
 
     // ignore the progress parameter, since MapFile is local
     // ignore the progress parameter, since MapFile is local
     final MapFile.Writer out =
     final MapFile.Writer out =
-      new MapFile.Writer(fs, file.toString(),
+      new MapFile.Writer(job, fs, file.toString(),
                          job.getMapOutputKeyClass(),
                          job.getMapOutputKeyClass(),
                          job.getMapOutputValueClass(),
                          job.getMapOutputValueClass(),
-                         job.getBoolean("mapred.output.compress", false));
+                         SequenceFile.getCompressionType(job));
 
 
     return new RecordWriter() {
     return new RecordWriter() {
 
 

+ 41 - 25
src/test/org/apache/hadoop/io/TestSetFile.java

@@ -24,6 +24,7 @@ import org.apache.commons.logging.*;
 
 
 import org.apache.hadoop.fs.*;
 import org.apache.hadoop.fs.*;
 import org.apache.hadoop.conf.*;
 import org.apache.hadoop.conf.*;
+import org.apache.hadoop.io.SequenceFile.CompressionType;
 
 
 /** Support for flat files of binary key/value pairs. */
 /** Support for flat files of binary key/value pairs. */
 public class TestSetFile extends TestCase {
 public class TestSetFile extends TestCase {
@@ -39,7 +40,10 @@ public class TestSetFile extends TestCase {
     FileSystem fs = new LocalFileSystem(conf);
     FileSystem fs = new LocalFileSystem(conf);
     try {
     try {
         RandomDatum[] data = generate(10000);
         RandomDatum[] data = generate(10000);
-        writeTest(fs, data, FILE);
+        writeTest(fs, data, FILE, CompressionType.NONE);
+        readTest(fs, data, FILE);
+
+        writeTest(fs, data, FILE, CompressionType.BLOCK);
         readTest(fs, data, FILE);
         readTest(fs, data, FILE);
     } finally {
     } finally {
         fs.close();
         fs.close();
@@ -47,23 +51,27 @@ public class TestSetFile extends TestCase {
   }
   }
 
 
   private static RandomDatum[] generate(int count) {
   private static RandomDatum[] generate(int count) {
-    LOG.debug("generating " + count + " records in memory");
+    LOG.info("generating " + count + " records in memory");
     RandomDatum[] data = new RandomDatum[count];
     RandomDatum[] data = new RandomDatum[count];
     RandomDatum.Generator generator = new RandomDatum.Generator();
     RandomDatum.Generator generator = new RandomDatum.Generator();
     for (int i = 0; i < count; i++) {
     for (int i = 0; i < count; i++) {
       generator.next();
       generator.next();
       data[i] = generator.getValue();
       data[i] = generator.getValue();
     }
     }
-    LOG.info("sorting " + count + " records in debug");
+    LOG.info("sorting " + count + " records");
     Arrays.sort(data);
     Arrays.sort(data);
     return data;
     return data;
   }
   }
 
 
-  private static void writeTest(FileSystem fs, RandomDatum[] data, String file)
+  private static void writeTest(FileSystem fs, RandomDatum[] data,
+                                String file, CompressionType compress)
     throws IOException {
     throws IOException {
     MapFile.delete(fs, file);
     MapFile.delete(fs, file);
-    LOG.debug("creating with " + data.length + " records");
-    SetFile.Writer writer = new SetFile.Writer(fs, file, RandomDatum.class);
+    LOG.info("creating with " + data.length + " records");
+    SetFile.Writer writer =
+      new SetFile.Writer(conf, fs, file,
+                         WritableComparator.get(RandomDatum.class),
+                         compress);
     for (int i = 0; i < data.length; i++)
     for (int i = 0; i < data.length; i++)
       writer.append(data[i]);
       writer.append(data[i]);
     writer.close();
     writer.close();
@@ -72,14 +80,16 @@ public class TestSetFile extends TestCase {
   private static void readTest(FileSystem fs, RandomDatum[] data, String file)
   private static void readTest(FileSystem fs, RandomDatum[] data, String file)
     throws IOException {
     throws IOException {
     RandomDatum v = new RandomDatum();
     RandomDatum v = new RandomDatum();
-    LOG.debug("reading " + data.length + " records");
+    int sample = (int)Math.sqrt(data.length);
+    Random random = new Random();
+    LOG.info("reading " + sample + " records");
     SetFile.Reader reader = new SetFile.Reader(fs, file, conf);
     SetFile.Reader reader = new SetFile.Reader(fs, file, conf);
-    for (int i = 0; i < data.length; i++) {
-      if (!reader.seek(data[i]))
+    for (int i = 0; i < sample; i++) {
+      if (!reader.seek(data[random.nextInt(data.length)]))
         throw new RuntimeException("wrong value at " + i);
         throw new RuntimeException("wrong value at " + i);
     }
     }
     reader.close();
     reader.close();
-    LOG.info("done reading " + data.length + " debug");
+    LOG.info("done reading " + data.length);
   }
   }
 
 
 
 
@@ -89,7 +99,9 @@ public class TestSetFile extends TestCase {
     boolean create = true;
     boolean create = true;
     boolean check = true;
     boolean check = true;
     String file = FILE;
     String file = FILE;
-    String usage = "Usage: TestSetFile (-local | -dfs <namenode:port>) [-count N] [-nocreate] [-nocheck] file";
+    String compress = "NONE";
+
+    String usage = "Usage: TestSetFile (-local | -dfs <namenode:port>) [-count N] [-nocreate] [-nocheck] [-compress type] file";
       
       
     if (args.length == 0) {
     if (args.length == 0) {
       System.err.println(usage);
       System.err.println(usage);
@@ -108,26 +120,30 @@ public class TestSetFile extends TestCase {
           create = false;
           create = false;
         } else if (args[i].equals("-nocheck")) {
         } else if (args[i].equals("-nocheck")) {
           check = false;
           check = false;
+        } else if (args[i].equals("-compress")) {
+          compress = args[++i];
         } else {
         } else {
           // file is required parameter
           // file is required parameter
           file = args[i];
           file = args[i];
         }
         }
+      }
 
 
-        LOG.info("count = " + count);
-        LOG.info("create = " + create);
-        LOG.info("check = " + check);
-        LOG.info("file = " + file);
-
-        RandomDatum[] data = generate(count);
-
-        if (create) {
-          writeTest(fs, data, file);
-        }
-
-        if (check) {
-          readTest(fs, data, file);
-        }
+      LOG.info("count = " + count);
+      LOG.info("create = " + create);
+      LOG.info("check = " + check);
+      LOG.info("compress = " + compress);
+      LOG.info("file = " + file);
+      
+      RandomDatum[] data = generate(count);
+      
+      if (create) {
+        writeTest(fs, data, file, CompressionType.valueOf(compress));
       }
       }
+      
+      if (check) {
+        readTest(fs, data, file);
+      }
+  
     } finally {
     } finally {
       fs.close();
       fs.close();
     }
     }