Просмотр исходного кода

HADOOP-412. Add filtering input format. Also move JobConf.newInstance() to a util class. Contributed by Hairong.

git-svn-id: https://svn.apache.org/repos/asf/lucene/hadoop/trunk@428083 13f79535-47bb-0310-9956-ffa450edef68
Doug Cutting 19 лет назад
Родитель
Сommit
31ad29d385

+ 5 - 0
CHANGES.txt

@@ -134,6 +134,11 @@ Trunk (unreleased changes)
 38. HADOOP-411.  Add unit tests for command line parser.
     (Hairong Kuang via cutting)
 
+39. HADOOP-412.  Add MapReduce input formats that support filtering
+    of SequenceFile data, including sampling and regex matching.
+    Also, move JobConf.newInstance() to a new utility class.
+    (Hairong Kuang via cutting)
+
 
 Release 0.4.0 - 2006-06-28
 

+ 27 - 28
src/java/org/apache/hadoop/io/SequenceFile.java

@@ -236,9 +236,11 @@ public class SequenceFile {
     private int keyLength;
 
     private boolean inflateValues;
-    private byte[] inflateIn = new byte[8192];
-    private DataOutputBuffer inflateOut = new DataOutputBuffer();
     private Inflater inflater = new Inflater();
+    private InflaterInputStream inflateFilter =
+        new InflaterInputStream(inBuf, inflater);
+    private DataInputStream inflateIn =
+        new DataInputStream(new BufferedInputStream(inflateFilter));
     private Configuration conf;
 
     /** @deprecated Call {@link #Reader(FileSystem,Path,Configuration)}.*/
@@ -304,6 +306,8 @@ public class SequenceFile {
     /** Close the file. */
     public synchronized void close() throws IOException {
       in.close();
+      inflateIn.close();
+      inflater.end();
     }
 
     /** Returns the class of keys in this file. */
@@ -337,6 +341,25 @@ public class SequenceFile {
       return true;
     }
 
+    /** Read the current value in the buffer into <code>val</code>. */
+    public synchronized void getCurrentValue(Writable val)
+        throws IOException {
+        if(val instanceof Configurable) {
+            ((Configurable) val).setConf(this.conf);
+        }
+        if (inflateValues) {
+            inflater.reset();
+            val.readFields(inflateIn);
+        }  else {
+            val.readFields(inBuf);
+            if (inBuf.getPosition() != inBuf.getLength()) {
+                throw new IOException("value: read "+(inBuf.getPosition()-keyLength)
+                                      + " bytes, should read " +
+                                     (inBuf.getLength()-keyLength));
+            }
+        }        
+    }
+    
     /** Read the next key/value pair in the file into <code>key</code> and
      * <code>val</code>.  Returns true if such a pair exists and false when at
      * end of file */
@@ -348,36 +371,12 @@ public class SequenceFile {
       boolean more = next(key);
 
       if (more) {
-
-        if (inflateValues) {
-          inflater.reset();
-          inflater.setInput(outBuf.getData(), keyLength,
-                            outBuf.getLength()-keyLength);
-          inflateOut.reset();
-          while (!inflater.finished()) {
-            try {
-              int count = inflater.inflate(inflateIn);
-              inflateOut.write(inflateIn, 0, count);
-            } catch (DataFormatException e) {
-              throw new IOException (e.toString());
-            }
-          }
-          inBuf.reset(inflateOut.getData(), inflateOut.getLength());
-        }
-        if(val instanceof Configurable) {
-          ((Configurable) val).setConf(this.conf);
-        }
-        val.readFields(inBuf);
-
-        if (inBuf.getPosition() != inBuf.getLength())
-          throw new IOException(val+" read "+(inBuf.getPosition()-keyLength)
-                                + " bytes, should read " +
-                                (inBuf.getLength()-keyLength));
+          getCurrentValue(val);
       }
 
       return more;
     }
-
+    
     /** Read the next key/value pair in the file into <code>buffer</code>.
      * Returns the length of the key read, or -1 if at end of file.  The length
      * of the value may be computed by calling buffer.getLength() before and

+ 4 - 1
src/java/org/apache/hadoop/io/WritableUtils.java

@@ -19,6 +19,8 @@ package org.apache.hadoop.io;
 import java.io.*;
 
 import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.util.ReflectionUtils;
+
 import java.util.zip.GZIPInputStream;
 import java.util.zip.GZIPOutputStream;
 
@@ -220,7 +222,8 @@ public final class WritableUtils  {
    */
   public static Writable clone(Writable orig, JobConf conf) {
     try {
-      Writable newInst = (Writable)conf.newInstance(orig.getClass());
+      Writable newInst = (Writable)ReflectionUtils.newInstance(orig.getClass(),
+                                                               conf);
       CopyInCopyOutBuffer buffer = (CopyInCopyOutBuffer)cloneBuffers.get();
       buffer.outBuffer.reset();
       orig.write(buffer.outBuffer);

+ 3 - 1
src/java/org/apache/hadoop/mapred/CombiningCollector.java

@@ -20,6 +20,7 @@ import java.io.*;
 import java.util.*;
 
 import org.apache.hadoop.io.*;
+import org.apache.hadoop.util.ReflectionUtils;
 
 /** Implements partial value reduction during mapping.  This can minimize the
  * size of intermediate data.  Buffers a list of values for each unique key,
@@ -41,7 +42,8 @@ class CombiningCollector implements OutputCollector {
     this.job = job;
     this.out = out;
     this.reporter = reporter;
-    this.combiner = (Reducer)job.newInstance(job.getCombinerClass());
+    this.combiner = (Reducer)ReflectionUtils.newInstance(job.getCombinerClass(),
+                                                         job);
     this.keyToValues = new TreeMap(job.getOutputKeyComparator());
     this.limit = job.getInt("mapred.combine.buffer.size", 100000);
   }

+ 8 - 23
src/java/org/apache/hadoop/mapred/JobConf.java

@@ -20,8 +20,6 @@ package org.apache.hadoop.mapred;
 import java.io.IOException;
 import java.io.File;
 
-import java.lang.reflect.Constructor;
-
 import java.util.StringTokenizer;
 import java.util.ArrayList;
 import java.util.Collections;
@@ -43,6 +41,7 @@ import org.apache.hadoop.io.UTF8;
 import org.apache.hadoop.mapred.lib.IdentityMapper;
 import org.apache.hadoop.mapred.lib.IdentityReducer;
 import org.apache.hadoop.mapred.lib.HashPartitioner;
+import org.apache.hadoop.util.ReflectionUtils;
 
 /** A map/reduce job configuration.  This names the {@link Mapper}, combiner
  * (if any), {@link Partitioner}, {@link Reducer}, {@link InputFormat}, and
@@ -280,17 +279,19 @@ public class JobConf extends Configuration {
   }
 
   public InputFormat getInputFormat() {
-    return (InputFormat)newInstance(getClass("mapred.input.format.class",
+    return (InputFormat)ReflectionUtils.newInstance(getClass("mapred.input.format.class",
                                              TextInputFormat.class,
-                                             InputFormat.class));
+                                             InputFormat.class),
+                                             this);
   }
   public void setInputFormat(Class theClass) {
     setClass("mapred.input.format.class", theClass, InputFormat.class);
   }
   public OutputFormat getOutputFormat() {
-    return (OutputFormat)newInstance(getClass("mapred.output.format.class",
+    return (OutputFormat)ReflectionUtils.newInstance(getClass("mapred.output.format.class",
                                               TextOutputFormat.class,
-                                              OutputFormat.class));
+                                              OutputFormat.class),
+                                              this);
   }
   public void setOutputFormat(Class theClass) {
     setClass("mapred.output.format.class", theClass, OutputFormat.class);
@@ -392,7 +393,7 @@ public class JobConf extends Configuration {
     Class theClass = getClass("mapred.output.key.comparator.class", null,
                               WritableComparator.class);
     if (theClass != null)
-      return (WritableComparator)newInstance(theClass);
+      return (WritableComparator)ReflectionUtils.newInstance(theClass, this);
     return WritableComparator.get(getMapOutputKeyClass());
   }
 
@@ -486,22 +487,6 @@ public class JobConf extends Configuration {
     set("mapred.job.name", name);
   }
   
-  private static final Class[] emptyArray = new Class[]{};
-  
-  public Object newInstance(Class theClass) {
-    Object result;
-    try {
-      Constructor meth = theClass.getDeclaredConstructor(emptyArray);
-      meth.setAccessible(true);
-      result = meth.newInstance(emptyArray);
-    } catch (Exception e) {
-      throw new RuntimeException(e);
-    }
-    if (result instanceof JobConfigurable)
-      ((JobConfigurable)result).configure(this);
-    return result;
-  }
-
   /** Find a jar that contains a class of the same name, if any.
    * It will return a jar file, even if that is not the first thing
    * on the class path that has a class with the same name.

+ 6 - 3
src/java/org/apache/hadoop/mapred/MapRunner.java

@@ -20,6 +20,7 @@ import java.io.IOException;
 
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.util.ReflectionUtils;
 
 /** Default {@link MapRunnable} implementation.*/
 public class MapRunner implements MapRunnable {
@@ -30,7 +31,8 @@ public class MapRunner implements MapRunnable {
 
   public void configure(JobConf job) {
     this.job = job;
-    this.mapper = (Mapper)job.newInstance(job.getMapperClass());
+    this.mapper = (Mapper)ReflectionUtils.newInstance(job.getMapperClass(),
+                                                      job);
     this.inputKeyClass = job.getInputKeyClass();
     this.inputValueClass = job.getInputValueClass();
   }
@@ -41,8 +43,9 @@ public class MapRunner implements MapRunnable {
     try {
       // allocate key & value instances that are re-used for all entries
       WritableComparable key =
-        (WritableComparable)job.newInstance(inputKeyClass);
-      Writable value = (Writable)job.newInstance(inputValueClass);
+        (WritableComparable)ReflectionUtils.newInstance(inputKeyClass, job);
+      Writable value = (Writable)ReflectionUtils.newInstance(inputValueClass,
+                                                             job);
       while (input.next(key, value)) {
         // map pair to output
         mapper.map(key, value, output, reporter);

+ 3 - 2
src/java/org/apache/hadoop/mapred/MapTask.java

@@ -25,6 +25,7 @@ import org.apache.hadoop.metrics.MetricsRecord;
 
 import org.apache.commons.logging.*;
 import org.apache.hadoop.metrics.Metrics;
+import org.apache.hadoop.util.ReflectionUtils;
 
 /** A Map task. */
 class MapTask extends Task {
@@ -142,7 +143,7 @@ class MapTask extends Task {
       }
 
       final Partitioner partitioner =
-        (Partitioner)job.newInstance(job.getPartitionerClass());
+        (Partitioner)ReflectionUtils.newInstance(job.getPartitionerClass(), job);
 
       OutputCollector partCollector = new OutputCollector() { // make collector
           public synchronized void collect(WritableComparable key,
@@ -188,7 +189,7 @@ class MapTask extends Task {
         };
 
       MapRunnable runner =
-        (MapRunnable)job.newInstance(job.getMapRunnerClass());
+        (MapRunnable)ReflectionUtils.newInstance(job.getMapRunnerClass(), job);
 
       try {
         runner.run(in, collector, reporter);      // run the map

+ 2 - 1
src/java/org/apache/hadoop/mapred/ReduceTask.java

@@ -196,7 +196,8 @@ class ReduceTask extends Task {
   public void run(JobConf job, final TaskUmbilicalProtocol umbilical)
     throws IOException {
     Class valueClass = job.getMapOutputValueClass();
-    Reducer reducer = (Reducer)job.newInstance(job.getReducerClass());
+    Reducer reducer = (Reducer)ReflectionUtils.newInstance(
+                                  job.getReducerClass(), job);
     reducer.configure(job);
     FileSystem lfs = FileSystem.getNamed("local", job);
 

+ 306 - 0
src/java/org/apache/hadoop/mapred/SequenceFileInputFilter.java

@@ -0,0 +1,306 @@
+/**
+ * Copyright 2005 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+ 
+package org.apache.hadoop.mapred;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.charset.CharacterCodingException;
+import java.security.DigestException;
+import java.security.MessageDigest;
+import java.security.NoSuchAlgorithmException;
+import java.util.regex.Pattern;
+import java.util.regex.PatternSyntaxException;
+
+import org.apache.hadoop.conf.Configurable;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.util.ReflectionUtils;
+
+/**
+ * A class that allows a map/red job to work on a sample of sequence files.
+ * The sample is decided by the filter class set by the job.
+ * 
+ * @author hairong
+ *
+ */
+
+public class SequenceFileInputFilter extends SequenceFileInputFormat {
+    final private static String FILTER_CLASS = "sequencefile.filter.class";
+    final private static String FILTER_FREQUENCY
+                       = "sequencefile.filter.frequency";
+    final private static String FILTER_REGEX = "sequencefile.filter.regex";
+    
+    public SequenceFileInputFilter() {
+    }
+    
+    /** Create a record reader for the given split
+     * @param fs file system where the file split is stored
+     * @param split file split
+     * @param job job configuration
+     * @param reporter reporter who sends report to task tracker
+     * @return RecordReader
+     */
+    public RecordReader getRecordReader(FileSystem fs, FileSplit split,
+            JobConf job, Reporter reporter)
+    throws IOException {
+        
+        reporter.setStatus(split.toString());
+        
+        return new FilterRecordReader(job, split);
+    }
+
+
+    /** set the filter class
+     * 
+     * @param conf application configuration
+     * @param filterClass filter class
+     */
+    public static void setFilterClass(Configuration conf, Class filterClass) {
+        conf.set(FILTER_CLASS, filterClass.getName() );
+    }
+
+         
+    /**
+     * filter interface
+     */
+    public interface Filter extends Configurable {
+        /** filter function
+         * Decide if a record should be filtered or not
+         * @param key record key
+         * @return true if a record is accepted; return false otherwise
+         */
+        public abstract boolean accept(Writable key);
+    }
+    
+    /**
+     * base calss for Filters
+     */
+    public static abstract class FilterBase implements Filter {
+        Configuration conf;
+        
+        public Configuration getConf() {
+            return conf;
+        }
+    }
+    
+    /** Records filter by matching key to regex
+     */
+    public static class RegexFilter extends FilterBase {
+        private Pattern p;
+        /** Define the filtering regex and stores it in conf
+         * @argument conf where the regex is set
+         * @argument regex regex used as a filter
+         */
+        public static void setPattern(Configuration conf, String regex )
+            throws PatternSyntaxException {
+            try {
+                Pattern.compile(regex);
+            } catch (PatternSyntaxException e) {
+                throw new IllegalArgumentException("Invalid pattern: "+regex);
+            }
+            conf.set(FILTER_REGEX, regex);
+        }
+        
+        public RegexFilter() { }
+        
+        /** configure the Filter by checking the configuration
+         */
+        public void setConf(Configuration conf) {
+            String regex = conf.get(FILTER_REGEX);
+            if(regex==null)
+                throw new RuntimeException(FILTER_REGEX + "not set");
+            this.p = Pattern.compile(regex);
+            this.conf = conf;
+        }
+
+
+        /** Filtering method
+         * If key matches the regex, return true; otherwise return false
+         * @see org.apache.hadoop.mapred.SequenceFileInputFilter.Filter#accept(org.apache.hadoop.io.Writable)
+         */
+        public boolean accept(Writable key) {
+            return p.matcher(key.toString()).matches();
+        }
+    }
+
+    /** This class returns a percentage of records
+     * The percentage is determined by a filtering frequency <i>f</i> using
+     * the criteria record# % f == 0.
+     * For example, if the frequency is 10, one out of 10 records is returned.
+     */
+    public static class PercentFilter extends FilterBase {
+        private int frequency;
+        private int count;
+
+        /** set the frequency and stores it in conf
+         * @param conf configuration
+         * @param frequency filtering frequencey
+         */
+        public static void setFrequency(Configuration conf, int frequency ){
+            if(frequency<=0)
+                throw new IllegalArgumentException(
+                   "Negative " + FILTER_FREQUENCY + ": "+frequency);
+            conf.setInt(FILTER_FREQUENCY, frequency);
+        }
+        
+        public PercentFilter() { }
+        
+        /** configure the filter by checking the configuration
+         * 
+         * @param conf configuration
+         */
+        public void setConf(Configuration conf) {
+            this.frequency = conf.getInt("sequencefile.filter.frequency", 10);
+            if(this.frequency <=0 ) {
+                throw new RuntimeException(
+                        "Negative "+FILTER_FREQUENCY+": "+this.frequency);
+            }
+            this.conf = conf;
+        }
+
+        /** Filtering method
+         * If record# % frequency==0, return true; otherwise return false
+         * @see org.apache.hadoop.mapred.SequenceFileInputFilter.Filter#accept(org.apache.hadoop.io.Writable)
+         */
+        public boolean accept(Writable key) {
+            boolean accepted = false;
+            if(count == 0)
+                accepted = true;
+            if( ++count == frequency ) {
+                count = 0;
+            }
+            return accepted;
+        }
+    }
+
+    /** This class returns a set of records by examing the MD5 digest of its
+     * key against a filtering frequency <i>f</i>. The filtering criteria is
+     * MD5(key) % f == 0.
+     */
+    public static class MD5Filter extends FilterBase {
+        private int frequency;
+        private static final MessageDigest DIGESTER;
+        public static final int MD5_LEN = 16;
+        private byte [] digest = new byte[MD5_LEN];
+        
+        static {
+          try {
+            DIGESTER = MessageDigest.getInstance("MD5");
+          } catch (NoSuchAlgorithmException e) {
+            throw new RuntimeException(e);
+          }
+        }
+
+
+        /** set the filtering frequency in configuration
+         * 
+         * @param conf configuration
+         * @param frequency filtering frequency
+         */
+        public static void setFrequency(Configuration conf, int frequency ){
+            if(frequency<=0)
+                throw new IllegalArgumentException(
+                   "Negative " + FILTER_FREQUENCY + ": "+frequency);
+            conf.setInt(FILTER_FREQUENCY, frequency);
+        }
+        
+        public MD5Filter() { }
+        
+        /** configure the filter according to configuration
+         * 
+         * @param conf configuration
+         */
+        public void setConf(Configuration conf) {
+            this.frequency = conf.getInt(FILTER_FREQUENCY, 10);
+            if(this.frequency <=0 ) {
+                throw new RuntimeException(
+                        "Negative "+FILTER_FREQUENCY+": "+this.frequency);
+            }
+            this.conf = conf;
+        }
+
+        /** Filtering method
+         * If MD5(key) % frequency==0, return true; otherwise return false
+         * @see org.apache.hadoop.mapred.SequenceFileInputFilter.Filter#accept(org.apache.hadoop.io.Writable)
+         */
+        public boolean accept(Writable key) {
+            try {
+                long hashcode;
+                if( key instanceof Text) {
+                    hashcode = MD5Hashcode((Text)key);
+                } else if( key instanceof BytesWritable) {
+                    hashcode = MD5Hashcode((BytesWritable)key);
+                } else {
+                    ByteBuffer bb;
+                    bb = Text.encode(key.toString());
+                    hashcode = MD5Hashcode(bb.array(),0, bb.limit());
+                }
+                if(hashcode/frequency*frequency==hashcode)
+                    return true;
+            } catch(Exception e) {
+                LOG.warn(e);
+                throw new RuntimeException(e);
+            }
+            return false;
+        }
+        
+        private long MD5Hashcode(Text key) throws DigestException {
+            return MD5Hashcode(key.getBytes(), 0, key.getLength());
+        }
+        
+        private long MD5Hashcode(BytesWritable key) throws DigestException {
+            return MD5Hashcode(key.get(), 0, key.getSize());
+        }
+        synchronized private long MD5Hashcode(byte[] bytes, 
+                int start, int length ) throws DigestException {
+            DIGESTER.update(bytes, 0, length);
+            DIGESTER.digest(digest, 0, MD5_LEN);
+            long hashcode=0;
+            for (int i = 0; i < 8; i++)
+                hashcode |= ((digest[i] & 0xffL) << (8*(7-i)));
+            return hashcode;
+        }
+    }
+    
+    private static class FilterRecordReader extends SequenceFileRecordReader {
+        private Filter filter;
+        
+        public FilterRecordReader(Configuration conf, FileSplit split)
+        throws IOException {
+            super(conf, split);
+            // instantiate filter
+            filter = (Filter)ReflectionUtils.newInstance(
+                    conf.getClass(FILTER_CLASS, PercentFilter.class), 
+                    conf);
+        }
+        
+        public synchronized boolean next(Writable key, Writable value)
+                              throws IOException {
+            while (next(key)) {
+                if(filter.accept(key)) {
+                    getCurrentValue(value);
+                    return true;
+                }
+            }
+            
+            return false;
+        }
+    }
+}

+ 21 - 0
src/java/org/apache/hadoop/mapred/SequenceFileRecordReader.java

@@ -64,10 +64,31 @@ public class SequenceFileRecordReader implements RecordReader {
     return more;
   }
   
+  protected synchronized boolean next(Writable key)
+      throws IOException {
+      if (!more) return false;
+      long pos = in.getPosition();
+      boolean eof = in.next(key);
+      if (pos >= end && in.syncSeen()) {
+          more = false;
+      } else {
+          more = eof;
+      }
+      return more;
+  }
+  
+  protected synchronized void getCurrentValue(Writable value)
+      throws IOException {
+      in.getCurrentValue(value);
+  }
+  
   public synchronized long getPos() throws IOException {
     return in.getPosition();
   }
   
+  protected synchronized void seek(long pos) throws IOException {
+      in.seek(pos);
+  }
   public synchronized void close() throws IOException { in.close(); }
   
 }

+ 58 - 0
src/java/org/apache/hadoop/util/ReflectionUtils.java

@@ -0,0 +1,58 @@
+/**
+ * Copyright 2005 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.util;
+
+import java.lang.reflect.Constructor;
+
+import org.apache.hadoop.conf.*;
+import org.apache.hadoop.mapred.*;
+
+/**
+ * General reflection utils
+ */
+
+public class ReflectionUtils {
+    
+    private static final Class[] emptyArray = new Class[]{};
+
+    /** Create an object for the given class and initialize it from conf
+     * 
+     * @param theClass class of which an object is created
+     * @param conf Configuration
+     * @return a new object
+     */
+    public static Object newInstance(Class theClass, Configuration conf) {
+        Object result;
+        try {
+            Constructor meth = theClass.getDeclaredConstructor(emptyArray);
+            meth.setAccessible(true);
+            result = meth.newInstance(emptyArray);
+        } catch (Exception e) {
+            throw new RuntimeException(e);
+        }
+        if (conf != null) {
+            if (result instanceof Configurable) {
+                ((Configurable) result).setConf(conf);
+            }
+            if (conf instanceof JobConf && 
+                    result instanceof JobConfigurable) {
+                ((JobConfigurable)result).configure((JobConf) conf);
+            }
+        }
+        return result;
+    }
+}

+ 175 - 0
src/test/org/apache/hadoop/mapred/TestSequenceFileInputFilter.java

@@ -0,0 +1,175 @@
+/**
+ * Copyright 2005 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapred;
+
+import java.io.*;
+import java.util.*;
+import junit.framework.TestCase;
+
+import org.apache.commons.logging.*;
+
+import org.apache.hadoop.fs.*;
+import org.apache.hadoop.io.*;
+import org.apache.hadoop.conf.*;
+
+public class TestSequenceFileInputFilter extends TestCase {
+  private static final Log LOG = InputFormatBase.LOG;
+
+  private static final int MAX_LENGTH = 15000;
+  private static final Configuration conf = new Configuration();
+  private static final JobConf job = new JobConf(conf);
+  private static final FileSystem fs;
+  private static final Path inDir = new Path(System.getProperty("test.build.data",".") + "/mapred");
+  private static final Path inFile = new Path(inDir, "test.seq");
+  private static final Random random = new Random(1);
+  private static final Reporter reporter = new Reporter() {
+      public void setStatus(String status) throws IOException {}
+      public void progress() throws IOException {}
+  };
+  
+  static {
+      job.setInputPath(inDir);
+      try {
+        fs = FileSystem.getNamed( "local", conf);
+    } catch (IOException e) {
+        e.printStackTrace();
+        throw new RuntimeException(e);
+    }
+  }
+
+  private static void createSequenceFile(int numRecords) throws Exception {
+      // create a file with length entries
+      SequenceFile.Writer writer =
+          new SequenceFile.Writer(fs, inFile,
+                  Text.class, BytesWritable.class);
+      try {
+          for (int i = 1; i <= numRecords; i++) {
+              Text key = new Text(Integer.toString(i));
+              byte[] data = new byte[random.nextInt(10)];
+              random.nextBytes(data);
+              BytesWritable value = new BytesWritable(data);
+              writer.append(key, value);
+          }
+      } finally {
+          writer.close();
+      }
+  }
+
+
+  private int countRecords(int numSplits) throws IOException {
+      InputFormat format = new SequenceFileInputFilter();
+      Text key = new Text();
+      BytesWritable value = new BytesWritable();
+      if(numSplits==0) {
+        numSplits =
+            random.nextInt(MAX_LENGTH/(SequenceFile.SYNC_INTERVAL/20))+1;
+      }
+      FileSplit[] splits = format.getSplits(fs, job, numSplits);
+      
+      // check each split
+      int count = 0;
+      for (int j = 0; j < splits.length; j++) {
+          RecordReader reader =
+              format.getRecordReader(fs, splits[j], job, reporter);
+          try {
+              while (reader.next(key, value)) {
+                  LOG.info("Accept record "+key.toString());
+                  count++;
+              }
+          } finally {
+              reader.close();
+          }
+      }
+      return count;
+  }
+  
+  public void testRegexFilter() throws Exception {
+    // set the filter class
+    LOG.info("Testing Regex Filter with patter: \\A10*");
+    SequenceFileInputFilter.setFilterClass(job, 
+            SequenceFileInputFilter.RegexFilter.class);
+    SequenceFileInputFilter.RegexFilter.setPattern(job, "\\A10*");
+    
+    // clean input dir
+    fs.delete(inDir);
+  
+    // for a variety of lengths
+    for (int length = 1; length < MAX_LENGTH;
+               length+= random.nextInt(MAX_LENGTH/10)+1) {
+        LOG.info("******Number of records: "+length);
+        createSequenceFile(length);
+        int count = countRecords(0);
+        assertEquals(count, length==0?0:(int)Math.log10(length)+1);
+    }
+    
+    // clean up
+    fs.delete(inDir);
+  }
+
+  public void testPercentFilter() throws Exception {
+      LOG.info("Testing Percent Filter with frequency: 1000");
+      // set the filter class
+      SequenceFileInputFilter.setFilterClass(job, 
+              SequenceFileInputFilter.PercentFilter.class);
+      SequenceFileInputFilter.PercentFilter.setFrequency(job, 1000);
+      
+      // clean input dir
+      fs.delete(inDir);
+    
+      // for a variety of lengths
+      for (int length = 0; length < MAX_LENGTH;
+                 length+= random.nextInt(MAX_LENGTH/10)+1) {
+          LOG.info("******Number of records: "+length);
+          createSequenceFile(length);
+          int count = countRecords(1);
+          LOG.info("Accepted "+count+" records");
+          int expectedCount = length/1000;
+          if(expectedCount*1000!=length)
+              expectedCount++;
+          assertEquals(count, expectedCount);
+      }
+      
+      // clean up
+      fs.delete(inDir);
+  }
+  
+  public void testMD5Filter() throws Exception {
+      // set the filter class
+      LOG.info("Testing MD5 Filter with frequency: 1000");
+      SequenceFileInputFilter.setFilterClass(job, 
+              SequenceFileInputFilter.MD5Filter.class);
+      SequenceFileInputFilter.MD5Filter.setFrequency(job, 1000);
+      
+      // clean input dir
+      fs.delete(inDir);
+    
+      // for a variety of lengths
+      for (int length = 0; length < MAX_LENGTH;
+                 length+= random.nextInt(MAX_LENGTH/10)+1) {
+          LOG.info("******Number of records: "+length);
+          createSequenceFile(length);
+          LOG.info("Accepted "+countRecords(0)+" records");
+      }
+      // clean up
+      fs.delete(inDir);
+    }
+
+  public static void main(String[] args) throws Exception {
+    TestSequenceFileInputFilter filter = new TestSequenceFileInputFilter();
+    filter.testRegexFilter();
+  }
+}