浏览代码

HADOOP-732. Add support to SequenceFile for arbitrary metadata, as a set of attribute value pairs. Contributed by Runping.

git-svn-id: https://svn.apache.org/repos/asf/lucene/hadoop/trunk@500378 13f79535-47bb-0310-9956-ffa450edef68
Doug Cutting 18 年之前
父节点
当前提交
b93eda63dd
共有 3 个文件被更改,包括 318 次插入38 次删除
  1. 3 0
      CHANGES.txt
  2. 231 38
      src/java/org/apache/hadoop/io/SequenceFile.java
  3. 84 0
      src/test/org/apache/hadoop/io/TestSequenceFile.java

+ 3 - 0
CHANGES.txt

@@ -67,6 +67,9 @@ Trunk (unreleased changes)
     in HDFS, try another replica of the data, if any.
     in HDFS, try another replica of the data, if any.
     (Wendy Chien via cutting)
     (Wendy Chien via cutting)
 
 
+21. HADOOP-732.  Add support to SequenceFile for arbitrary metadata,
+    as a set of attribute value pairs.  (Runping Qi via cutting)
+
 
 
 Release 0.10.1 - 2007-01-10
 Release 0.10.1 - 2007-01-10
 
 

+ 231 - 38
src/java/org/apache/hadoop/io/SequenceFile.java

@@ -50,8 +50,9 @@ public class SequenceFile {
 
 
   private static final byte BLOCK_COMPRESS_VERSION = (byte)4;
   private static final byte BLOCK_COMPRESS_VERSION = (byte)4;
   private static final byte CUSTOM_COMPRESS_VERSION = (byte)5;
   private static final byte CUSTOM_COMPRESS_VERSION = (byte)5;
+  private static final byte VERSION_WITH_METADATA = (byte)6;
   private static byte[] VERSION = new byte[] {
   private static byte[] VERSION = new byte[] {
-    (byte)'S', (byte)'E', (byte)'Q', CUSTOM_COMPRESS_VERSION
+    (byte)'S', (byte)'E', (byte)'Q', VERSION_WITH_METADATA
   };
   };
 
 
   private static final int SYNC_ESCAPE = -1;      // "length" of sync entries
   private static final int SYNC_ESCAPE = -1;      // "length" of sync entries
@@ -93,7 +94,7 @@ public class SequenceFile {
                                         CompressionType val) {
                                         CompressionType val) {
     job.set("io.seqfile.compression.type", val.toString());
     job.set("io.seqfile.compression.type", val.toString());
   }
   }
-  
+    
   /**
   /**
    * Construct the preferred type of SequenceFile Writer.
    * Construct the preferred type of SequenceFile Writer.
    * @param fs The configured filesystem. 
    * @param fs The configured filesystem. 
@@ -130,7 +131,7 @@ public class SequenceFile {
     Writer writer = null;
     Writer writer = null;
     
     
     if (compressionType == CompressionType.NONE) {
     if (compressionType == CompressionType.NONE) {
-      writer = new Writer(fs, conf, name, keyClass, valClass);
+      writer = new Writer(fs, conf, name, keyClass, valClass, null, new Metadata());
     } else if (compressionType == CompressionType.RECORD) {
     } else if (compressionType == CompressionType.RECORD) {
       writer = new RecordCompressWriter(fs, conf, name, keyClass, valClass, 
       writer = new RecordCompressWriter(fs, conf, name, keyClass, valClass, 
           new DefaultCodec());
           new DefaultCodec());
@@ -161,13 +162,13 @@ public class SequenceFile {
     Writer writer = null;
     Writer writer = null;
     
     
     if (compressionType == CompressionType.NONE) {
     if (compressionType == CompressionType.NONE) {
-      writer = new Writer(fs, conf, name, keyClass, valClass, progress); 
+      writer = new Writer(fs, conf, name, keyClass, valClass, progress, new Metadata()); 
     } else if (compressionType == CompressionType.RECORD) {
     } else if (compressionType == CompressionType.RECORD) {
       writer = new RecordCompressWriter(fs, conf, name, 
       writer = new RecordCompressWriter(fs, conf, name, 
-          keyClass, valClass, new DefaultCodec(), progress);
+          keyClass, valClass, new DefaultCodec(), progress, new Metadata());
     } else if (compressionType == CompressionType.BLOCK){
     } else if (compressionType == CompressionType.BLOCK){
       writer = new BlockCompressWriter(fs, conf, name, 
       writer = new BlockCompressWriter(fs, conf, name, 
-          keyClass, valClass, new DefaultCodec(), progress);
+          keyClass, valClass, new DefaultCodec(), progress, new Metadata());
     }
     }
     
     
     return writer;
     return writer;
@@ -222,6 +223,7 @@ public class SequenceFile {
    * @param compressionType The compression type.
    * @param compressionType The compression type.
    * @param codec The compression codec.
    * @param codec The compression codec.
    * @param progress The Progressable object to track progress.
    * @param progress The Progressable object to track progress.
+   * @param metadata The metadata of the file.
    * @return Returns the handle to the constructed SequenceFile Writer.
    * @return Returns the handle to the constructed SequenceFile Writer.
    * @throws IOException
    * @throws IOException
    */
    */
@@ -229,7 +231,7 @@ public class SequenceFile {
   createWriter(FileSystem fs, Configuration conf, Path name, 
   createWriter(FileSystem fs, Configuration conf, Path name, 
       Class keyClass, Class valClass, 
       Class keyClass, Class valClass, 
       CompressionType compressionType, CompressionCodec codec,
       CompressionType compressionType, CompressionCodec codec,
-      Progressable progress) throws IOException {
+      Progressable progress, Metadata metadata) throws IOException {
     if ((codec instanceof GzipCodec) && 
     if ((codec instanceof GzipCodec) && 
         !NativeCodeLoader.isNativeCodeLoaded() && 
         !NativeCodeLoader.isNativeCodeLoaded() && 
         !ZlibFactory.isNativeZlibLoaded()) {
         !ZlibFactory.isNativeZlibLoaded()) {
@@ -240,17 +242,40 @@ public class SequenceFile {
     Writer writer = null;
     Writer writer = null;
     
     
     if (compressionType == CompressionType.NONE) {
     if (compressionType == CompressionType.NONE) {
-      writer = new Writer(fs, conf, name, keyClass, valClass, progress);
+      writer = new Writer(fs, conf, name, keyClass, valClass, progress, metadata);
     } else if (compressionType == CompressionType.RECORD) {
     } else if (compressionType == CompressionType.RECORD) {
       writer = new RecordCompressWriter(fs, conf, name, 
       writer = new RecordCompressWriter(fs, conf, name, 
-          keyClass, valClass, codec, progress);
+          keyClass, valClass, codec, progress, metadata);
     } else if (compressionType == CompressionType.BLOCK){
     } else if (compressionType == CompressionType.BLOCK){
       writer = new BlockCompressWriter(fs, conf, name, 
       writer = new BlockCompressWriter(fs, conf, name, 
-          keyClass, valClass, codec, progress);
+          keyClass, valClass, codec, progress, metadata);
     }
     }
     
     
     return writer;
     return writer;
   }
   }
+  
+  /**
+   * Construct the preferred type of SequenceFile Writer.
+   * @param fs The configured filesystem. 
+   * @param conf The configuration.
+   * @param name The name of the file. 
+   * @param keyClass The 'key' type.
+   * @param valClass The 'value' type.
+   * @param compressionType The compression type.
+   * @param codec The compression codec.
+   * @param progress The Progressable object to track progress.
+   * @return Returns the handle to the constructed SequenceFile Writer.
+   * @throws IOException
+   */
+  public static Writer
+  createWriter(FileSystem fs, Configuration conf, Path name, 
+      Class keyClass, Class valClass, 
+      CompressionType compressionType, CompressionCodec codec,
+      Progressable progress) throws IOException {
+    Writer writer = createWriter(fs, conf, name, keyClass, valClass, 
+        compressionType, codec, progress, new Metadata());
+    return writer;
+  }
 
 
   /**
   /**
    * Construct the preferred type of 'raw' SequenceFile Writer.
    * Construct the preferred type of 'raw' SequenceFile Writer.
@@ -259,13 +284,14 @@ public class SequenceFile {
    * @param valClass The 'value' type.
    * @param valClass The 'value' type.
    * @param compress Compress data?
    * @param compress Compress data?
    * @param blockCompress Compress blocks?
    * @param blockCompress Compress blocks?
+   * @param metadata The metadata of the file.
    * @return Returns the handle to the constructed SequenceFile Writer.
    * @return Returns the handle to the constructed SequenceFile Writer.
    * @throws IOException
    * @throws IOException
    */
    */
   private static Writer
   private static Writer
   createWriter(Configuration conf, FSDataOutputStream out, 
   createWriter(Configuration conf, FSDataOutputStream out, 
       Class keyClass, Class valClass, boolean compress, boolean blockCompress,
       Class keyClass, Class valClass, boolean compress, boolean blockCompress,
-      CompressionCodec codec)
+      CompressionCodec codec, Metadata metadata)
   throws IOException {
   throws IOException {
     if ((codec instanceof GzipCodec) && 
     if ((codec instanceof GzipCodec) && 
         !NativeCodeLoader.isNativeCodeLoaded() && 
         !NativeCodeLoader.isNativeCodeLoaded() && 
@@ -277,16 +303,37 @@ public class SequenceFile {
     Writer writer = null;
     Writer writer = null;
 
 
     if (!compress) {
     if (!compress) {
-      writer = new Writer(conf, out, keyClass, valClass);
+      writer = new Writer(conf, out, keyClass, valClass, metadata);
     } else if (compress && !blockCompress) {
     } else if (compress && !blockCompress) {
-      writer = new RecordCompressWriter(conf, out, keyClass, valClass, codec);
+      writer = new RecordCompressWriter(conf, out, keyClass, valClass, codec, metadata);
     } else {
     } else {
-      writer = new BlockCompressWriter(conf, out, keyClass, valClass, codec);
+      writer = new BlockCompressWriter(conf, out, keyClass, valClass, codec, metadata);
     }
     }
     
     
     return writer;
     return writer;
   }
   }
 
 
+  /**
+   * Construct the preferred type of 'raw' SequenceFile Writer.
+   * @param out The stream on top which the writer is to be constructed.
+   * @param keyClass The 'key' type.
+   * @param valClass The 'value' type.
+   * @param compress Compress data?
+   * @param blockCompress Compress blocks?
+   * @return Returns the handle to the constructed SequenceFile Writer.
+   * @throws IOException
+   */
+  private static Writer
+  createWriter(Configuration conf, FSDataOutputStream out, 
+      Class keyClass, Class valClass, boolean compress, boolean blockCompress,
+      CompressionCodec codec)
+  throws IOException {
+    Writer writer = createWriter(conf, out, keyClass, valClass, compress, 
+        blockCompress, codec, new Metadata());
+    return writer;
+  }
+
+  
   /**
   /**
    * Construct the preferred type of 'raw' SequenceFile Writer.
    * Construct the preferred type of 'raw' SequenceFile Writer.
    * @param conf The configuration.
    * @param conf The configuration.
@@ -295,13 +342,14 @@ public class SequenceFile {
    * @param valClass The 'value' type.
    * @param valClass The 'value' type.
    * @param compressionType The compression type.
    * @param compressionType The compression type.
    * @param codec The compression codec.
    * @param codec The compression codec.
+   * @param metadata The metadata of the file.
    * @return Returns the handle to the constructed SequenceFile Writer.
    * @return Returns the handle to the constructed SequenceFile Writer.
    * @throws IOException
    * @throws IOException
    */
    */
   public static Writer
   public static Writer
   createWriter(Configuration conf, FSDataOutputStream out, 
   createWriter(Configuration conf, FSDataOutputStream out, 
       Class keyClass, Class valClass, CompressionType compressionType,
       Class keyClass, Class valClass, CompressionType compressionType,
-      CompressionCodec codec)
+      CompressionCodec codec, Metadata metadata)
   throws IOException {
   throws IOException {
     if ((codec instanceof GzipCodec) && 
     if ((codec instanceof GzipCodec) && 
         !NativeCodeLoader.isNativeCodeLoaded() && 
         !NativeCodeLoader.isNativeCodeLoaded() && 
@@ -313,15 +361,37 @@ public class SequenceFile {
     Writer writer = null;
     Writer writer = null;
 
 
     if (compressionType == CompressionType.NONE) {
     if (compressionType == CompressionType.NONE) {
-      writer = new Writer(conf, out, keyClass, valClass);
+      writer = new Writer(conf, out, keyClass, valClass, metadata);
     } else if (compressionType == CompressionType.RECORD) {
     } else if (compressionType == CompressionType.RECORD) {
-      writer = new RecordCompressWriter(conf, out, keyClass, valClass, codec);
+      writer = new RecordCompressWriter(conf, out, keyClass, valClass, codec, metadata);
     } else if (compressionType == CompressionType.BLOCK){
     } else if (compressionType == CompressionType.BLOCK){
-      writer = new BlockCompressWriter(conf, out, keyClass, valClass, codec);
+      writer = new BlockCompressWriter(conf, out, keyClass, valClass, codec, metadata);
     }
     }
     
     
     return writer;
     return writer;
   }
   }
+  
+  /**
+   * Construct the preferred type of 'raw' SequenceFile Writer.
+   * @param conf The configuration.
+   * @param out The stream on top which the writer is to be constructed.
+   * @param keyClass The 'key' type.
+   * @param valClass The 'value' type.
+   * @param compressionType The compression type.
+   * @param codec The compression codec.
+   * @return Returns the handle to the constructed SequenceFile Writer.
+   * @throws IOException
+   */
+  public static Writer
+  createWriter(Configuration conf, FSDataOutputStream out, 
+      Class keyClass, Class valClass, CompressionType compressionType,
+      CompressionCodec codec)
+  throws IOException {
+    Writer writer = createWriter(conf, out, keyClass, valClass, compressionType,
+        codec, new Metadata());
+    return writer;
+  }
+  
 
 
   /** The interface to 'raw' values of SequenceFiles. */
   /** The interface to 'raw' values of SequenceFiles. */
   public static interface ValueBytes {
   public static interface ValueBytes {
@@ -424,6 +494,99 @@ public class SequenceFile {
 
 
   } // CompressedBytes
   } // CompressedBytes
   
   
+  /**
+   * The class encapsulating with the metadata of a file.
+   * The metadata of a file is a list of attribute name/value
+   * pairs of Text type.
+   *
+   */
+  static class Metadata implements Writable {
+
+    private TreeMap<Text, Text> theMetadata;
+    
+    public Metadata() {
+      this(new TreeMap<Text, Text>());
+    }
+    
+    public Metadata(TreeMap<Text, Text> arg) {
+      if (arg == null) {
+        this.theMetadata = new TreeMap<Text, Text>();
+      } else {
+        this.theMetadata = arg;
+      }
+    }
+    
+    public Text get(Text name) {
+      return this.theMetadata.get(name);
+    }
+    
+    public void set(Text name, Text value) {
+      this.theMetadata.put(name, value);
+    }
+    
+    public TreeMap<Text, Text> getMetadata() {
+      return new TreeMap<Text, Text>(this.theMetadata);
+    }
+    
+    public void write(DataOutput out) throws IOException {
+      out.writeInt(this.theMetadata.size());
+      Iterator iter = this.theMetadata.entrySet().iterator();
+      while (iter.hasNext()) {
+        Map.Entry<Text, Text> en = (Map.Entry<Text, Text>)iter.next();
+        en.getKey().write(out);
+        en.getValue().write(out);
+      }
+    }
+
+    public void readFields(DataInput in) throws IOException {
+      int sz = in.readInt();
+      if (sz < 0) throw new IOException("Invalid size: " + sz + " for file metadata object");
+      this.theMetadata = new TreeMap<Text, Text>();
+      for (int i = 0; i < sz; i++) {
+        Text key = new Text();
+        Text val = new Text();
+        key.readFields(in);
+        val.readFields(in);
+        this.theMetadata.put(key, val);
+      }    
+    }
+    
+    public boolean equals(Metadata other) {
+      if (other == null) return false;
+      if (this.theMetadata.size() != other.theMetadata.size()) {
+        return false;
+      }
+      Iterator iter1 = this.theMetadata.entrySet().iterator();
+      Iterator iter2 = other.theMetadata.entrySet().iterator();
+      while (iter1.hasNext() && iter2.hasNext()) {
+        Map.Entry<Text, Text> en1 = (Map.Entry<Text, Text>)iter1.next();
+        Map.Entry<Text, Text> en2 = (Map.Entry<Text, Text>)iter2.next();
+        if (!en1.getKey().equals(en2.getKey())) {
+           return false;
+        }
+        if (!en1.getValue().equals(en2.getValue())) {
+           return false;
+        }
+      }
+      if (iter1.hasNext() || iter2.hasNext()) {
+        return false;
+      }
+      return true;
+    }
+    
+    public String toString() {
+      StringBuffer sb = new StringBuffer();
+      sb.append("size: ").append(this.theMetadata.size()).append("\n");
+      Iterator iter = this.theMetadata.entrySet().iterator();
+      while (iter.hasNext()) {
+        Map.Entry<Text, Text> en = (Map.Entry<Text, Text>)iter.next();
+        sb.append("\t").append(en.getKey().toString()).append("\t").append(en.getValue().toString());
+        sb.append("\n");
+      }
+      return sb.toString();
+    }
+  }
+  
   /** Write key/value pairs to a sequence-format file. */
   /** Write key/value pairs to a sequence-format file. */
   public static class Writer {
   public static class Writer {
     Configuration conf;
     Configuration conf;
@@ -438,6 +601,7 @@ public class SequenceFile {
     CompressionCodec codec = null;
     CompressionCodec codec = null;
     CompressionOutputStream deflateFilter = null;
     CompressionOutputStream deflateFilter = null;
     DataOutputStream deflateOut = null;
     DataOutputStream deflateOut = null;
+    Metadata metadata = null;
 
 
     // Insert a globally unique 16-byte value every few entries, so that one
     // Insert a globally unique 16-byte value every few entries, so that one
     // can seek into the middle of a file and then synchronize with record
     // can seek into the middle of a file and then synchronize with record
@@ -462,24 +626,24 @@ public class SequenceFile {
     public Writer(FileSystem fs, Configuration conf, Path name, 
     public Writer(FileSystem fs, Configuration conf, Path name, 
         Class keyClass, Class valClass)
         Class keyClass, Class valClass)
       throws IOException {
       throws IOException {
-      this(fs, conf, name, keyClass, valClass, null);
+      this(fs, conf, name, keyClass, valClass, null, new Metadata());
     }
     }
     
     
     /** Create the named file with write-progress reporter. */
     /** Create the named file with write-progress reporter. */
     public Writer(FileSystem fs, Configuration conf, Path name, 
     public Writer(FileSystem fs, Configuration conf, Path name, 
-        Class keyClass, Class valClass, Progressable progress)
+        Class keyClass, Class valClass, Progressable progress, Metadata metadata)
       throws IOException {
       throws IOException {
-      init(name, conf, fs.create(name, progress), keyClass, valClass, false, null);
+      init(name, conf, fs.create(name, progress), keyClass, valClass, false, null, metadata);
       initializeFileHeader();
       initializeFileHeader();
       writeFileHeader();
       writeFileHeader();
       finalizeFileHeader();
       finalizeFileHeader();
     }
     }
-
+    
     /** Write to an arbitrary stream using a specified buffer size. */
     /** Write to an arbitrary stream using a specified buffer size. */
     private Writer(Configuration conf, FSDataOutputStream out, 
     private Writer(Configuration conf, FSDataOutputStream out, 
-        Class keyClass, Class valClass)
+        Class keyClass, Class valClass, Metadata metadata)
     throws IOException {
     throws IOException {
-      init(null, conf, out, keyClass, valClass, false, null);
+      init(null, conf, out, keyClass, valClass, false, null, metadata);
       
       
       initializeFileHeader();
       initializeFileHeader();
       writeFileHeader();
       writeFileHeader();
@@ -514,12 +678,13 @@ public class SequenceFile {
       if(this.isCompressed()) {
       if(this.isCompressed()) {
         Text.writeString(out, (codec.getClass()).getName());
         Text.writeString(out, (codec.getClass()).getName());
       }
       }
+      this.metadata.write(out);
     }
     }
-
+    
     /** Initialize. */
     /** Initialize. */
     void init(Path name, Configuration conf, FSDataOutputStream out,
     void init(Path name, Configuration conf, FSDataOutputStream out,
                       Class keyClass, Class valClass,
                       Class keyClass, Class valClass,
-                      boolean compress, CompressionCodec codec) 
+                      boolean compress, CompressionCodec codec, Metadata metadata) 
     throws IOException {
     throws IOException {
       this.target = name;
       this.target = name;
       this.conf = conf;
       this.conf = conf;
@@ -528,6 +693,7 @@ public class SequenceFile {
       this.valClass = valClass;
       this.valClass = valClass;
       this.compress = compress;
       this.compress = compress;
       this.codec = codec;
       this.codec = codec;
+      this.metadata = metadata;
       if(this.codec != null) {
       if(this.codec != null) {
         ReflectionUtils.setConf(this.codec, this.conf);
         ReflectionUtils.setConf(this.codec, this.conf);
         this.deflateFilter = this.codec.createOutputStream(buffer);
         this.deflateFilter = this.codec.createOutputStream(buffer);
@@ -644,7 +810,7 @@ public class SequenceFile {
     public RecordCompressWriter(FileSystem fs, Configuration conf, Path name, 
     public RecordCompressWriter(FileSystem fs, Configuration conf, Path name, 
         Class keyClass, Class valClass, CompressionCodec codec) 
         Class keyClass, Class valClass, CompressionCodec codec) 
     throws IOException {
     throws IOException {
-      super.init(name, conf, fs.create(name), keyClass, valClass, true, codec);
+      super.init(name, conf, fs.create(name), keyClass, valClass, true, codec, new Metadata());
       
       
       initializeFileHeader();
       initializeFileHeader();
       writeFileHeader();
       writeFileHeader();
@@ -654,28 +820,36 @@ public class SequenceFile {
     /** Create the named file with write-progress reporter. */
     /** Create the named file with write-progress reporter. */
     public RecordCompressWriter(FileSystem fs, Configuration conf, Path name, 
     public RecordCompressWriter(FileSystem fs, Configuration conf, Path name, 
         Class keyClass, Class valClass, CompressionCodec codec,
         Class keyClass, Class valClass, CompressionCodec codec,
-        Progressable progress)
+        Progressable progress, Metadata metadata)
     throws IOException {
     throws IOException {
       super.init(name, conf, fs.create(name, progress), 
       super.init(name, conf, fs.create(name, progress), 
-          keyClass, valClass, true, codec);
+          keyClass, valClass, true, codec, metadata);
       
       
       initializeFileHeader();
       initializeFileHeader();
       writeFileHeader();
       writeFileHeader();
       finalizeFileHeader();
       finalizeFileHeader();
     }
     }
     
     
+    /** Create the named file with write-progress reporter. */
+    public RecordCompressWriter(FileSystem fs, Configuration conf, Path name, 
+        Class keyClass, Class valClass, CompressionCodec codec,
+        Progressable progress)
+    throws IOException {
+      this(fs, conf, name, keyClass, valClass, codec, progress, new Metadata());
+    }
+    
     /** Write to an arbitrary stream using a specified buffer size. */
     /** Write to an arbitrary stream using a specified buffer size. */
     private RecordCompressWriter(Configuration conf, FSDataOutputStream out,
     private RecordCompressWriter(Configuration conf, FSDataOutputStream out,
-                   Class keyClass, Class valClass, CompressionCodec codec)
+                   Class keyClass, Class valClass, CompressionCodec codec, Metadata metadata)
       throws IOException {
       throws IOException {
-      super.init(null, conf, out, keyClass, valClass, true, codec);
+      super.init(null, conf, out, keyClass, valClass, true, codec, metadata);
       
       
       initializeFileHeader();
       initializeFileHeader();
       writeFileHeader();
       writeFileHeader();
       finalizeFileHeader();
       finalizeFileHeader();
       
       
     }
     }
-
+    
     boolean isCompressed() { return true; }
     boolean isCompressed() { return true; }
     boolean isBlockCompressed() { return false; }
     boolean isBlockCompressed() { return false; }
 
 
@@ -752,7 +926,7 @@ public class SequenceFile {
     public BlockCompressWriter(FileSystem fs, Configuration conf, Path name, 
     public BlockCompressWriter(FileSystem fs, Configuration conf, Path name, 
         Class keyClass, Class valClass, CompressionCodec codec) 
         Class keyClass, Class valClass, CompressionCodec codec) 
     throws IOException {
     throws IOException {
-      super.init(name, conf, fs.create(name), keyClass, valClass, true, codec);
+      super.init(name, conf, fs.create(name), keyClass, valClass, true, codec, new Metadata());
       init(conf.getInt("io.seqfile.compress.blocksize", 1000000));
       init(conf.getInt("io.seqfile.compress.blocksize", 1000000));
       
       
       initializeFileHeader();
       initializeFileHeader();
@@ -763,10 +937,10 @@ public class SequenceFile {
     /** Create the named file with write-progress reporter. */
     /** Create the named file with write-progress reporter. */
     public BlockCompressWriter(FileSystem fs, Configuration conf, Path name, 
     public BlockCompressWriter(FileSystem fs, Configuration conf, Path name, 
         Class keyClass, Class valClass, CompressionCodec codec,
         Class keyClass, Class valClass, CompressionCodec codec,
-        Progressable progress)
+        Progressable progress, Metadata metadata)
     throws IOException {
     throws IOException {
       super.init(name, conf, fs.create(name, progress), keyClass, valClass, 
       super.init(name, conf, fs.create(name, progress), keyClass, valClass, 
-          true, codec);
+          true, codec, metadata);
       init(conf.getInt("io.seqfile.compress.blocksize", 1000000));
       init(conf.getInt("io.seqfile.compress.blocksize", 1000000));
       
       
       initializeFileHeader();
       initializeFileHeader();
@@ -774,18 +948,26 @@ public class SequenceFile {
       finalizeFileHeader();
       finalizeFileHeader();
     }
     }
     
     
+    /** Create the named file with write-progress reporter. */
+    public BlockCompressWriter(FileSystem fs, Configuration conf, Path name, 
+        Class keyClass, Class valClass, CompressionCodec codec,
+        Progressable progress)
+    throws IOException {
+      this(fs, conf, name, keyClass, valClass, codec, progress, new Metadata());
+    }
+    
     /** Write to an arbitrary stream using a specified buffer size. */
     /** Write to an arbitrary stream using a specified buffer size. */
     private BlockCompressWriter(Configuration conf, FSDataOutputStream out,
     private BlockCompressWriter(Configuration conf, FSDataOutputStream out,
-                   Class keyClass, Class valClass, CompressionCodec codec)
+                   Class keyClass, Class valClass, CompressionCodec codec, Metadata metadata)
       throws IOException {
       throws IOException {
-      super.init(null, conf, out, keyClass, valClass, true, codec);
+      super.init(null, conf, out, keyClass, valClass, true, codec, metadata);
       init(1000000);
       init(1000000);
       
       
       initializeFileHeader();
       initializeFileHeader();
       writeFileHeader();
       writeFileHeader();
       finalizeFileHeader();
       finalizeFileHeader();
     }
     }
-
+    
     boolean isCompressed() { return true; }
     boolean isCompressed() { return true; }
     boolean isBlockCompressed() { return true; }
     boolean isBlockCompressed() { return true; }
 
 
@@ -928,6 +1110,7 @@ public class SequenceFile {
     private Class valClass;
     private Class valClass;
 
 
     private CompressionCodec codec = null;
     private CompressionCodec codec = null;
+    private Metadata metadata = null;
     
     
     private byte[] sync = new byte[SYNC_HASH_SIZE];
     private byte[] sync = new byte[SYNC_HASH_SIZE];
     private byte[] syncCheck = new byte[SYNC_HASH_SIZE];
     private byte[] syncCheck = new byte[SYNC_HASH_SIZE];
@@ -1046,6 +1229,11 @@ public class SequenceFile {
         }
         }
       }
       }
       
       
+      this.metadata = new Metadata();
+      if (version >= VERSION_WITH_METADATA) {    // if version >= 6
+        this.metadata.readFields(in);
+      }
+      
       if (version > 1) {                          // if version > 1
       if (version > 1) {                          // if version > 1
         in.readFully(sync);                       // read sync bytes
         in.readFully(sync);                       // read sync bytes
       }
       }
@@ -1095,6 +1283,11 @@ public class SequenceFile {
     /** Returns the compression codec of data in this file. */
     /** Returns the compression codec of data in this file. */
     public CompressionCodec getCompressionCodec() { return codec; }
     public CompressionCodec getCompressionCodec() { return codec; }
 
 
+    /** Returns the metadata object of the file */
+    public Metadata getMetadata() {
+      return this.metadata;
+    }
+    
     /** Returns the configuration used for this file. */
     /** Returns the configuration used for this file. */
     Configuration getConf() { return conf; }
     Configuration getConf() { return conf; }
     
     

+ 84 - 0
src/test/org/apache/hadoop/io/TestSequenceFile.java

@@ -317,7 +317,91 @@ public class TestSequenceFile extends TestCase {
     return sorter;
     return sorter;
   }
   }
 
 
+  /** Unit tests for SequenceFile metadata. */
+  public void testSequenceFileMetadata() throws Exception {
+    LOG.info("Testing SequenceFile with metadata");
+    int count = 1024 * 10;
+    int megabytes = 1;
+    int factor = 5;
+    CompressionCodec codec = new DefaultCodec();
+    Path file = new Path(System.getProperty("test.build.data",".")+"/test.seq.metadata");
+    Path recordCompressedFile = 
+      new Path(System.getProperty("test.build.data",".")+"/test.rc.seq.metadata");
+    Path blockCompressedFile = 
+      new Path(System.getProperty("test.build.data",".")+"/test.bc.seq.metadata");
+ 
+    FileSystem fs = FileSystem.getLocal(conf);
+    SequenceFile.Metadata theMetadata = new SequenceFile.Metadata();
+    theMetadata.set(new Text("name_1"), new Text("value_1"));
+    theMetadata.set(new Text("name_2"), new Text("value_2"));
+    theMetadata.set(new Text("name_3"), new Text("value_3"));
+    theMetadata.set(new Text("name_4"), new Text("value_4"));
+    
+    int seed = new Random().nextInt();
+    
+    try {
+      // SequenceFile.Writer
+      writeMetadataTest(fs, count, seed, file, CompressionType.NONE, null, theMetadata);
+      SequenceFile.Metadata aMetadata = readMetadata(fs, file);
+      if (!theMetadata.equals(aMetadata)) {
+        LOG.info("The original metadata:\n" + theMetadata.toString());
+        LOG.info("The retrieved metadata:\n" + aMetadata.toString());
+        throw new RuntimeException("metadata not match:  " + 1);
+      }
+      // SequenceFile.RecordCompressWriter
+      writeMetadataTest(fs, count, seed, recordCompressedFile, CompressionType.RECORD, 
+          codec, theMetadata);
+      aMetadata = readMetadata(fs, recordCompressedFile);
+      if (!theMetadata.equals(aMetadata)) {
+        LOG.info("The original metadata:\n" + theMetadata.toString());
+        LOG.info("The retrieved metadata:\n" + aMetadata.toString());
+        throw new RuntimeException("metadata not match:  " + 2);
+      }
+      // SequenceFile.BlockCompressWriter
+      writeMetadataTest(fs, count, seed, blockCompressedFile, CompressionType.BLOCK,
+          codec, theMetadata);
+      aMetadata =readMetadata(fs, blockCompressedFile);
+      if (!theMetadata.equals(aMetadata)) {
+        LOG.info("The original metadata:\n" + theMetadata.toString());
+        LOG.info("The retrieved metadata:\n" + aMetadata.toString());
+        throw new RuntimeException("metadata not match:  " + 3);
+      }
+    } finally {
+      fs.close();
+    }
+    LOG.info("Successfully tested SequenceFile with metadata");
+  }
+  
+  
+  private static SequenceFile.Metadata readMetadata(FileSystem fs, Path file)
+  throws IOException {
+  LOG.info("reading file: " + file.toString() + "\n");
+  SequenceFile.Reader reader = new SequenceFile.Reader(fs, file, conf);
+  SequenceFile.Metadata meta = reader.getMetadata(); 
+  reader.close();
+  return meta;
+  }
 
 
+  private static void writeMetadataTest(FileSystem fs, int count, int seed, Path file, 
+      CompressionType compressionType, CompressionCodec codec, SequenceFile.Metadata metadata)
+    throws IOException {
+    fs.delete(file);
+    LOG.info("creating " + count + " records with metadata and with" + compressionType +
+              " compression");
+    SequenceFile.Writer writer = 
+      SequenceFile.createWriter(fs, conf, file, 
+          RandomDatum.class, RandomDatum.class, compressionType, codec, null, metadata);
+    RandomDatum.Generator generator = new RandomDatum.Generator(seed);
+    for (int i = 0; i < count; i++) {
+      generator.next();
+      RandomDatum key = generator.getKey();
+      RandomDatum value = generator.getValue();
+
+      writer.append(key, value);
+    }
+    writer.close();
+  }
+  
   /** For debugging and testing. */
   /** For debugging and testing. */
   public static void main(String[] args) throws Exception {
   public static void main(String[] args) throws Exception {
     int count = 1024 * 1024;
     int count = 1024 * 1024;