浏览代码

HADOOP-636. Add MapFile and ArrayFile constructors which accept a Progressable.

git-svn-id: https://svn.apache.org/repos/asf/lucene/hadoop/trunk@469622 13f79535-47bb-0310-9956-ffa450edef68
Doug Cutting 18 年之前
父节点
当前提交
cf8e859d40

+ 5 - 0
CHANGES.txt

@@ -82,6 +82,11 @@ Trunk (unreleased changes)
 
 
 22. HADOOP-658.  Update source file headers per Apache policy.  (cutting)
 22. HADOOP-658.  Update source file headers per Apache policy.  (cutting)
 
 
+23. HADOOP-636.  Add MapFile & ArrayFile constructors which accept a
+    Progressable, and pass it down to SequenceFile.  This permits
+    reduce tasks which use MapFile to still report progress while
+    writing blocks to the filesystem.  (cutting)
+
 
 
 Release 0.7.2 - 2006-10-18
 Release 0.7.2 - 2006-10-18
 
 

+ 16 - 2
src/java/org/apache/hadoop/io/ArrayFile.java

@@ -21,6 +21,9 @@ package org.apache.hadoop.io;
 import java.io.*;
 import java.io.*;
 import org.apache.hadoop.fs.*;
 import org.apache.hadoop.fs.*;
 import org.apache.hadoop.conf.*;
 import org.apache.hadoop.conf.*;
+import org.apache.hadoop.util.*;
+import org.apache.hadoop.io.SequenceFile.CompressionType;
+
 
 
 /** A dense file-based mapping from integers to values. */
 /** A dense file-based mapping from integers to values. */
 public class ArrayFile extends MapFile {
 public class ArrayFile extends MapFile {
@@ -31,11 +34,22 @@ public class ArrayFile extends MapFile {
   public static class Writer extends MapFile.Writer {
   public static class Writer extends MapFile.Writer {
     private LongWritable count = new LongWritable(0);
     private LongWritable count = new LongWritable(0);
 
 
-    /** Create the named file for values of the named class. */
-    public Writer(FileSystem fs, String file, Class valClass) throws IOException {
+    /** Create the named file for values of the named class.
+     * @deprecated specify {@link CompressionType} and {@link Progressable}
+     */
+    public Writer(FileSystem fs, String file, Class valClass)
+      throws IOException {
       super(fs, file, LongWritable.class, valClass);
       super(fs, file, LongWritable.class, valClass);
     }
     }
 
 
+    /** Create the named file for values of the named class. */
+    public Writer(Configuration conf, FileSystem fs,
+                  String file, Class valClass,
+                  CompressionType compress, Progressable progress)
+      throws IOException {
+      super(conf, fs, file, LongWritable.class, valClass, compress, progress);
+    }
+
     /** Append a value to the file. */
     /** Append a value to the file. */
     public synchronized void append(Writable value) throws IOException {
     public synchronized void append(Writable value) throws IOException {
       super.append(count, value);                 // add to map
       super.append(count, value);                 // add to map

+ 29 - 6
src/java/org/apache/hadoop/io/MapFile.java

@@ -21,6 +21,7 @@ package org.apache.hadoop.io;
 import java.io.*;
 import java.io.*;
 import org.apache.hadoop.fs.*;
 import org.apache.hadoop.fs.*;
 import org.apache.hadoop.conf.*;
 import org.apache.hadoop.conf.*;
+import org.apache.hadoop.util.Progressable;
 import org.apache.hadoop.io.SequenceFile.CompressionType;
 import org.apache.hadoop.io.SequenceFile.CompressionType;
 
 
 /** A file-based map from keys to values.
 /** A file-based map from keys to values.
@@ -73,13 +74,22 @@ public class MapFile {
     }
     }
 
 
     /** Create the named map for keys of the named class.
     /** Create the named map for keys of the named class.
-     * @deprecated specify a {@link CompressionType} instead
+     * @deprecated specify {@link CompressionType} and {@link Progressable}
      */
      */
     public Writer(FileSystem fs, String dirName,
     public Writer(FileSystem fs, String dirName,
                   Class keyClass, Class valClass, boolean compress)
                   Class keyClass, Class valClass, boolean compress)
       throws IOException {
       throws IOException {
       this(fs, dirName, WritableComparator.get(keyClass), valClass, compress);
       this(fs, dirName, WritableComparator.get(keyClass), valClass, compress);
     }
     }
+    /** Create the named map for keys of the named class. */
+    public Writer(Configuration conf, FileSystem fs, String dirName,
+                  Class keyClass, Class valClass,
+                  CompressionType compress, Progressable progress)
+      throws IOException {
+      this(conf,fs,dirName,WritableComparator.get(keyClass),valClass,
+           compress, progress);
+    }
+
     /** Create the named map for keys of the named class. */
     /** Create the named map for keys of the named class. */
     public Writer(Configuration conf, FileSystem fs, String dirName,
     public Writer(Configuration conf, FileSystem fs, String dirName,
                   Class keyClass, Class valClass, CompressionType compress)
                   Class keyClass, Class valClass, CompressionType compress)
@@ -87,14 +97,16 @@ public class MapFile {
       this(conf,fs,dirName,WritableComparator.get(keyClass),valClass,compress);
       this(conf,fs,dirName,WritableComparator.get(keyClass),valClass,compress);
     }
     }
 
 
-    /** Create the named map using the named key comparator. */
+    /** Create the named map using the named key comparator.
+     * @deprecated specify {@link CompressionType} and {@link Progressable}
+     */
     public Writer(FileSystem fs, String dirName,
     public Writer(FileSystem fs, String dirName,
                   WritableComparator comparator, Class valClass)
                   WritableComparator comparator, Class valClass)
       throws IOException {
       throws IOException {
       this(fs, dirName, comparator, valClass, false);
       this(fs, dirName, comparator, valClass, false);
     }
     }
     /** Create the named map using the named key comparator.
     /** Create the named map using the named key comparator.
-     * @deprecated specify a {@link CompressionType} instead
+     * @deprecated specify {@link CompressionType} and {@link Progressable}
      */
      */
     public Writer(FileSystem fs, String dirName,
     public Writer(FileSystem fs, String dirName,
                   WritableComparator comparator, Class valClass,
                   WritableComparator comparator, Class valClass,
@@ -105,11 +117,21 @@ public class MapFile {
            compress ? CompressionType.RECORD : CompressionType.NONE);
            compress ? CompressionType.RECORD : CompressionType.NONE);
     }
     }
 
 
-    /** Create the named map using the named key comparator. */
+    /** Create the named map using the named key comparator.
+     * @deprecated specify a {@link Progressable}
+     */
     public Writer(Configuration conf, FileSystem fs, String dirName,
     public Writer(Configuration conf, FileSystem fs, String dirName,
                   WritableComparator comparator, Class valClass,
                   WritableComparator comparator, Class valClass,
                   SequenceFile.CompressionType compress)
                   SequenceFile.CompressionType compress)
       throws IOException {
       throws IOException {
+      this(conf, fs, dirName, comparator, valClass, compress, null);
+    }
+    /** Create the named map using the named key comparator. */
+    public Writer(Configuration conf, FileSystem fs, String dirName,
+                  WritableComparator comparator, Class valClass,
+                  SequenceFile.CompressionType compress,
+                  Progressable progress)
+      throws IOException {
 
 
       this.comparator = comparator;
       this.comparator = comparator;
       this.lastKey = comparator.newKey();
       this.lastKey = comparator.newKey();
@@ -123,10 +145,11 @@ public class MapFile {
       Class keyClass = comparator.getKeyClass();
       Class keyClass = comparator.getKeyClass();
       this.data =
       this.data =
         SequenceFile.createWriter
         SequenceFile.createWriter
-        (fs,conf,dataFile,keyClass,valClass,compress);
+        (fs,conf,dataFile,keyClass,valClass,compress,progress);
       this.index =
       this.index =
         SequenceFile.createWriter
         SequenceFile.createWriter
-        (fs,conf,indexFile,keyClass,LongWritable.class,CompressionType.BLOCK);
+        (fs, conf, indexFile, keyClass, LongWritable.class,
+         CompressionType.BLOCK, progress);
     }
     }
     
     
     /** The number of entries that are added before an index entry is added.*/
     /** The number of entries that are added before an index entry is added.*/

+ 2 - 1
src/java/org/apache/hadoop/mapred/MapFileOutputFormat.java

@@ -46,7 +46,8 @@ public class MapFileOutputFormat extends OutputFormatBase {
       new MapFile.Writer(job, fs, file.toString(),
       new MapFile.Writer(job, fs, file.toString(),
                          job.getMapOutputKeyClass(),
                          job.getMapOutputKeyClass(),
                          job.getMapOutputValueClass(),
                          job.getMapOutputValueClass(),
-                         SequenceFile.getCompressionType(job));
+                         SequenceFile.getCompressionType(job),
+                         progress);
 
 
     return new RecordWriter() {
     return new RecordWriter() {