Browse Source

HBASE-3063 BloomMapFile - fail-fast version of MapFile for sparsely populated key space

git-svn-id: https://svn.apache.org/repos/asf/hadoop/core/trunk@726797 13f79535-47bb-0310-9956-ffa450edef68
Michael Stack 16 years ago
parent
commit
4ba168e487

+ 3 - 0
CHANGES.txt

@@ -86,6 +86,9 @@ Trunk (unreleased changes)
 
 
     HADOOP-4826. Introduce admin command saveNamespace. (shv)
     HADOOP-4826. Introduce admin command saveNamespace. (shv)
 
 
+    HADOOP-3063  BloomMapFile - fail-fast version of MapFile for sparsely
+                 populated key space (Andrzej Bialecki via stack)
+
   IMPROVEMENTS
   IMPROVEMENTS
 
 
     HADOOP-4749. Added a new counter REDUCE_INPUT_BYTES. (Yongqiang He via 
     HADOOP-4749. Added a new counter REDUCE_INPUT_BYTES. (Yongqiang He via 

+ 42 - 0
LICENSE.txt

@@ -200,3 +200,45 @@
    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    See the License for the specific language governing permissions and
    See the License for the specific language governing permissions and
    limitations under the License.
    limitations under the License.
+
+
+APACHE HADOOP SUBCOMPONENTS:
+
+The Apache Hadoop project contains subcomponents with separate copyright
+notices and license terms. Your use of the source code for the these
+subcomponents is subject to the terms and conditions of the following
+licenses. 
+
+For the org.apache.hadoop.util.bloom.* classes:
+
+/**
+ *
+ * Copyright (c) 2005, European Commission project OneLab under contract
+ * 034819 (http://www.one-lab.org)
+ * All rights reserved.
+ * Redistribution and use in source and binary forms, with or 
+ * without modification, are permitted provided that the following 
+ * conditions are met:
+ *  - Redistributions of source code must retain the above copyright 
+ *    notice, this list of conditions and the following disclaimer.
+ *  - Redistributions in binary form must reproduce the above copyright 
+ *    notice, this list of conditions and the following disclaimer in 
+ *    the documentation and/or other materials provided with the distribution.
+ *  - Neither the name of the University Catholique de Louvain - UCL
+ *    nor the names of its contributors may be used to endorse or 
+ *    promote products derived from this software without specific prior 
+ *    written permission.
+ *    
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS 
+ * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT 
+ * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS 
+ * FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE 
+ * COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, 
+ * INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, 
+ * BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; 
+ * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER 
+ * CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT 
+ * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN 
+ * ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE 
+ * POSSIBILITY OF SUCH DAMAGE.
+ */

+ 28 - 0
conf/hadoop-default.xml

@@ -1263,6 +1263,34 @@ creations/deletions), or "all".</description>
   </description>
   </description>
 </property>
 </property>
 
 
+<property>
+  <name>io.mapfile.bloom.size</name>
+  <value>1048576</value>
+  <description>The size of BloomFilter-s used in BloomMapFile. Each time this many
+  keys is appended the next BloomFilter will be created (inside a DynamicBloomFilter).
+  Larger values minimize the number of filters, which slightly increases the performance,
+  but may waste too much space if the total number of keys is usually much smaller
+  than this number.
+  </description>
+</property>
+
+<property>
+  <name>io.mapfile.bloom.error.rate</name>
+  <value>0.005</value>
+  <description>The rate of false positives in BloomFilter-s used in BloomMapFile.
+  As this value decreases, the size of BloomFilter-s increases exponentially. This
+  value is the probability of encountering false positives (default is 0.5%).
+  </description>
+</property>
+
+<property>
+  <name>hadoop.util.hash.type</name>
+  <value>murmur</value>
+  <description>The default implementation of Hash. Currently this can take one of the
+  two values: 'murmur' to select MurmurHash and 'jenkins' to select JenkinsHash.
+  </description>
+</property>
+
 <property>
 <property>
   <name>map.sort.class</name>
   <name>map.sort.class</name>
   <value>org.apache.hadoop.util.QuickSort</value>
   <value>org.apache.hadoop.util.QuickSort</value>

+ 259 - 0
src/core/org/apache/hadoop/io/BloomMapFile.java

@@ -0,0 +1,259 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.io;
+
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.SequenceFile.CompressionType;
+import org.apache.hadoop.io.compress.CompressionCodec;
+import org.apache.hadoop.util.Progressable;
+import org.apache.hadoop.util.bloom.DynamicBloomFilter;
+import org.apache.hadoop.util.bloom.Filter;
+import org.apache.hadoop.util.bloom.Key;
+import org.apache.hadoop.util.hash.Hash;
+
+/**
+ * This class extends {@link MapFile} and provides very much the same
+ * functionality. However, it uses dynamic Bloom filters to provide
+ * quick membership test for keys, and it offers a fast version of 
+ * {@link Reader#get(WritableComparable, Writable)} operation, especially in
+ * case of sparsely populated MapFile-s.
+ */
+public class BloomMapFile {
+  private static final Log LOG = LogFactory.getLog(BloomMapFile.class);
+  public static final String BLOOM_FILE_NAME = "bloom";
+  public static final int HASH_COUNT = 5;
+  
+  public static void delete(FileSystem fs, String name) throws IOException {
+    Path dir = new Path(name);
+    Path data = new Path(dir, MapFile.DATA_FILE_NAME);
+    Path index = new Path(dir, MapFile.INDEX_FILE_NAME);
+    Path bloom = new Path(dir, BLOOM_FILE_NAME);
+
+    fs.delete(data, true);
+    fs.delete(index, true);
+    fs.delete(bloom, true);
+    fs.delete(dir, true);
+  }
+  
+  public static class Writer extends MapFile.Writer {
+    private DynamicBloomFilter bloomFilter;
+    private int numKeys;
+    private int vectorSize;
+    private Key bloomKey = new Key();
+    private DataOutputBuffer buf = new DataOutputBuffer();
+    private FileSystem fs;
+    private Path dir;
+    
+    public Writer(Configuration conf, FileSystem fs, String dirName,
+        Class<? extends WritableComparable> keyClass,
+        Class<? extends Writable> valClass, CompressionType compress,
+        CompressionCodec codec, Progressable progress) throws IOException {
+      super(conf, fs, dirName, keyClass, valClass, compress, codec, progress);
+      this.fs = fs;
+      this.dir = new Path(dirName);
+      initBloomFilter(conf);
+    }
+
+    public Writer(Configuration conf, FileSystem fs, String dirName,
+        Class<? extends WritableComparable> keyClass,
+        Class valClass, CompressionType compress,
+        Progressable progress) throws IOException {
+      super(conf, fs, dirName, keyClass, valClass, compress, progress);
+      this.fs = fs;
+      this.dir = new Path(dirName);
+      initBloomFilter(conf);
+    }
+
+    public Writer(Configuration conf, FileSystem fs, String dirName,
+        Class<? extends WritableComparable> keyClass,
+        Class valClass, CompressionType compress)
+        throws IOException {
+      super(conf, fs, dirName, keyClass, valClass, compress);
+      this.fs = fs;
+      this.dir = new Path(dirName);
+      initBloomFilter(conf);
+    }
+
+    public Writer(Configuration conf, FileSystem fs, String dirName,
+        WritableComparator comparator, Class valClass,
+        CompressionType compress, CompressionCodec codec, Progressable progress)
+        throws IOException {
+      super(conf, fs, dirName, comparator, valClass, compress, codec, progress);
+      this.fs = fs;
+      this.dir = new Path(dirName);
+      initBloomFilter(conf);
+    }
+
+    public Writer(Configuration conf, FileSystem fs, String dirName,
+        WritableComparator comparator, Class valClass,
+        CompressionType compress, Progressable progress) throws IOException {
+      super(conf, fs, dirName, comparator, valClass, compress, progress);
+      this.fs = fs;
+      this.dir = new Path(dirName);
+      initBloomFilter(conf);
+    }
+
+    public Writer(Configuration conf, FileSystem fs, String dirName,
+        WritableComparator comparator, Class valClass, CompressionType compress)
+        throws IOException {
+      super(conf, fs, dirName, comparator, valClass, compress);
+      this.fs = fs;
+      this.dir = new Path(dirName);
+      initBloomFilter(conf);
+    }
+
+    public Writer(Configuration conf, FileSystem fs, String dirName,
+        WritableComparator comparator, Class valClass) throws IOException {
+      super(conf, fs, dirName, comparator, valClass);
+      this.fs = fs;
+      this.dir = new Path(dirName);
+      initBloomFilter(conf);
+    }
+
+    public Writer(Configuration conf, FileSystem fs, String dirName,
+        Class<? extends WritableComparable> keyClass,
+        Class valClass) throws IOException {
+      super(conf, fs, dirName, keyClass, valClass);
+      this.fs = fs;
+      this.dir = new Path(dirName);
+      initBloomFilter(conf);
+    }
+
+    private synchronized void initBloomFilter(Configuration conf) {
+      numKeys = conf.getInt("io.mapfile.bloom.size", 1024 * 1024);
+      // vector size should be <code>-kn / (ln(1 - c^(1/k)))</code> bits for
+      // single key, where <code> is the number of hash functions,
+      // <code>n</code> is the number of keys and <code>c</code> is the desired
+      // max. error rate.
+      // Our desired error rate is by default 0.005, i.e. 0.5%
+      float errorRate = conf.getFloat("io.mapfile.bloom.error.rate", 0.005f);
+      vectorSize = (int)Math.ceil((double)(-HASH_COUNT * numKeys) /
+          Math.log(1.0 - Math.pow(errorRate, 1.0/HASH_COUNT)));
+      bloomFilter = new DynamicBloomFilter(vectorSize, HASH_COUNT,
+          Hash.getHashType(conf), numKeys);
+    }
+
+    @Override
+    public synchronized void append(WritableComparable key, Writable val)
+        throws IOException {
+      super.append(key, val);
+      buf.reset();
+      key.write(buf);
+      bloomKey.set(buf.getData(), 1.0);
+      bloomFilter.add(bloomKey);
+    }
+
+    @Override
+    public synchronized void close() throws IOException {
+      super.close();
+      DataOutputStream out = fs.create(new Path(dir, BLOOM_FILE_NAME), true);
+      bloomFilter.write(out);
+      out.flush();
+      out.close();
+    }
+
+  }
+  
+  public static class Reader extends MapFile.Reader {
+    private DynamicBloomFilter bloomFilter;
+    private DataOutputBuffer buf = new DataOutputBuffer();
+    private Key bloomKey = new Key();
+
+    public Reader(FileSystem fs, String dirName, Configuration conf)
+        throws IOException {
+      super(fs, dirName, conf);
+      initBloomFilter(fs, dirName, conf);
+    }
+
+    public Reader(FileSystem fs, String dirName, WritableComparator comparator,
+        Configuration conf, boolean open) throws IOException {
+      super(fs, dirName, comparator, conf, open);
+      initBloomFilter(fs, dirName, conf);
+    }
+
+    public Reader(FileSystem fs, String dirName, WritableComparator comparator,
+        Configuration conf) throws IOException {
+      super(fs, dirName, comparator, conf);
+      initBloomFilter(fs, dirName, conf);
+    }
+    
+    private void initBloomFilter(FileSystem fs, String dirName,
+        Configuration conf) {
+      try {
+        DataInputStream in = fs.open(new Path(dirName, BLOOM_FILE_NAME));
+        bloomFilter = new DynamicBloomFilter();
+        bloomFilter.readFields(in);
+        in.close();
+      } catch (IOException ioe) {
+        LOG.warn("Can't open BloomFilter: " + ioe + " - fallback to MapFile.");
+        bloomFilter = null;
+      }
+    }
+    
+    /**
+     * Checks if this MapFile has the indicated key. The membership test is
+     * performed using a Bloom filter, so the result has always non-zero
+     * probability of false positives.
+     * @param key key to check
+     * @return  false iff key doesn't exist, true if key probably exists.
+     * @throws IOException
+     */
+    public boolean probablyHasKey(WritableComparable key) throws IOException {
+      if (bloomFilter == null) {
+        return true;
+      }
+      buf.reset();
+      key.write(buf);
+      bloomKey.set(buf.getData(), 1.0);
+      return bloomFilter.membershipTest(bloomKey);
+    }
+    
+    /**
+     * Fast version of the
+     * {@link MapFile.Reader#get(WritableComparable, Writable)} method. First
+     * it checks the Bloom filter for the existence of the key, and only if
+     * present it performs the real get operation. This yields significant
+     * performance improvements for get operations on sparsely populated files.
+     */
+    @Override
+    public synchronized Writable get(WritableComparable key, Writable val)
+        throws IOException {
+      if (!probablyHasKey(key)) {
+        return null;
+      }
+      return super.get(key, val);
+    }
+    
+    /**
+     * Retrieve the Bloom filter used by this instance of the Reader.
+     * @return a Bloom filter (see {@link Filter})
+     */
+    public Filter getBloomFilter() {
+      return bloomFilter;
+    }
+  }
+}

+ 234 - 0
src/core/org/apache/hadoop/util/bloom/BloomFilter.java

@@ -0,0 +1,234 @@
+/**
+ *
+ * Copyright (c) 2005, European Commission project OneLab under contract 034819 (http://www.one-lab.org)
+ * All rights reserved.
+ * Redistribution and use in source and binary forms, with or 
+ * without modification, are permitted provided that the following 
+ * conditions are met:
+ *  - Redistributions of source code must retain the above copyright 
+ *    notice, this list of conditions and the following disclaimer.
+ *  - Redistributions in binary form must reproduce the above copyright 
+ *    notice, this list of conditions and the following disclaimer in 
+ *    the documentation and/or other materials provided with the distribution.
+ *  - Neither the name of the University Catholique de Louvain - UCL
+ *    nor the names of its contributors may be used to endorse or 
+ *    promote products derived from this software without specific prior 
+ *    written permission.
+ *    
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS 
+ * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT 
+ * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS 
+ * FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE 
+ * COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, 
+ * INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, 
+ * BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; 
+ * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER 
+ * CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT 
+ * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN 
+ * ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE 
+ * POSSIBILITY OF SUCH DAMAGE.
+ */
+
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.util.bloom;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+import java.util.BitSet;
+
+/**
+ * Implements a <i>Bloom filter</i>, as defined by Bloom in 1970.
+ * <p>
+ * The Bloom filter is a data structure that was introduced in 1970 and that has been adopted by 
+ * the networking research community in the past decade thanks to the bandwidth efficiencies that it
+ * offers for the transmission of set membership information between networked hosts.  A sender encodes 
+ * the information into a bit vector, the Bloom filter, that is more compact than a conventional 
+ * representation. Computation and space costs for construction are linear in the number of elements.  
+ * The receiver uses the filter to test whether various elements are members of the set. Though the 
+ * filter will occasionally return a false positive, it will never return a false negative. When creating 
+ * the filter, the sender can choose its desired point in a trade-off between the false positive rate and the size. 
+ * 
+ * <p>
+ * Originally created by
+ * <a href="http://www.one-lab.org">European Commission One-Lab Project 034819</a>.
+ * 
+ * @see Filter The general behavior of a filter
+ * 
+ * @see <a href="http://portal.acm.org/citation.cfm?id=362692&dl=ACM&coll=portal">Space/Time Trade-Offs in Hash Coding with Allowable Errors</a>
+ */
+public class BloomFilter extends Filter {
+  private static final byte[] bitvalues = new byte[] {
+    (byte)0x01,
+    (byte)0x02,
+    (byte)0x04,
+    (byte)0x08,
+    (byte)0x10,
+    (byte)0x20,
+    (byte)0x40,
+    (byte)0x80
+  };
+  
+  /** The bit vector. */
+  BitSet bits;
+
+  /** Default constructor - use with readFields */
+  public BloomFilter() {
+    super();
+  }
+  
+  /**
+   * Constructor
+   * @param vectorSize The vector size of <i>this</i> filter.
+   * @param nbHash The number of hash function to consider.
+   * @param hashType type of the hashing function (see
+   * {@link org.apache.hadoop.util.hash.Hash}).
+   */
+  public BloomFilter(int vectorSize, int nbHash, int hashType) {
+    super(vectorSize, nbHash, hashType);
+
+    bits = new BitSet(this.vectorSize);
+  }
+
+  @Override
+  public void add(Key key) {
+    if(key == null) {
+      throw new NullPointerException("key cannot be null");
+    }
+
+    int[] h = hash.hash(key);
+    hash.clear();
+
+    for(int i = 0; i < nbHash; i++) {
+      bits.set(h[i]);
+    }
+  }
+
+  @Override
+  public void and(Filter filter) {
+    if(filter == null
+        || !(filter instanceof BloomFilter)
+        || filter.vectorSize != this.vectorSize
+        || filter.nbHash != this.nbHash) {
+      throw new IllegalArgumentException("filters cannot be and-ed");
+    }
+
+    this.bits.and(((BloomFilter) filter).bits);
+  }
+
+  @Override
+  public boolean membershipTest(Key key) {
+    if(key == null) {
+      throw new NullPointerException("key cannot be null");
+    }
+
+    int[] h = hash.hash(key);
+    hash.clear();
+    for(int i = 0; i < nbHash; i++) {
+      if(!bits.get(h[i])) {
+        return false;
+      }
+    }
+    return true;
+  }
+
+  @Override
+  public void not() {
+    bits.flip(0, vectorSize - 1);
+  }
+
+  @Override
+  public void or(Filter filter) {
+    if(filter == null
+        || !(filter instanceof BloomFilter)
+        || filter.vectorSize != this.vectorSize
+        || filter.nbHash != this.nbHash) {
+      throw new IllegalArgumentException("filters cannot be or-ed");
+    }
+    bits.or(((BloomFilter) filter).bits);
+  }
+
+  @Override
+  public void xor(Filter filter) {
+    if(filter == null
+        || !(filter instanceof BloomFilter)
+        || filter.vectorSize != this.vectorSize
+        || filter.nbHash != this.nbHash) {
+      throw new IllegalArgumentException("filters cannot be xor-ed");
+    }
+    bits.xor(((BloomFilter) filter).bits);
+  }
+
+  @Override
+  public String toString() {
+    return bits.toString();
+  }
+
+  /**
+   * @return size of the the bloomfilter
+   */
+  public int getVectorSize() {
+    return this.vectorSize;
+  }
+
+  // Writable
+
+  @Override
+  public void write(DataOutput out) throws IOException {
+    super.write(out);
+    byte[] bytes = new byte[getNBytes()];
+    for(int i = 0, byteIndex = 0, bitIndex = 0; i < vectorSize; i++, bitIndex++) {
+      if (bitIndex == 8) {
+        bitIndex = 0;
+        byteIndex++;
+      }
+      if (bitIndex == 0) {
+        bytes[byteIndex] = 0;
+      }
+      if (bits.get(i)) {
+        bytes[byteIndex] |= bitvalues[bitIndex];
+      }
+    }
+    out.write(bytes);
+  }
+
+  @Override
+  public void readFields(DataInput in) throws IOException {
+    super.readFields(in);
+    bits = new BitSet(this.vectorSize);
+    byte[] bytes = new byte[getNBytes()];
+    in.readFully(bytes);
+    for(int i = 0, byteIndex = 0, bitIndex = 0; i < vectorSize; i++, bitIndex++) {
+      if (bitIndex == 8) {
+        bitIndex = 0;
+        byteIndex++;
+      }
+      if ((bytes[byteIndex] & bitvalues[bitIndex]) != 0) {
+        bits.set(i);
+      }
+    }
+  }
+  
+  /* @return number of bytes needed to hold bit vector */
+  private int getNBytes() {
+    return (vectorSize + 7) / 8;
+  }
+}//end class

+ 305 - 0
src/core/org/apache/hadoop/util/bloom/CountingBloomFilter.java

@@ -0,0 +1,305 @@
+/**
+ *
+ * Copyright (c) 2005, European Commission project OneLab under contract 034819 (http://www.one-lab.org)
+ * All rights reserved.
+ * Redistribution and use in source and binary forms, with or 
+ * without modification, are permitted provided that the following 
+ * conditions are met:
+ *  - Redistributions of source code must retain the above copyright 
+ *    notice, this list of conditions and the following disclaimer.
+ *  - Redistributions in binary form must reproduce the above copyright 
+ *    notice, this list of conditions and the following disclaimer in 
+ *    the documentation and/or other materials provided with the distribution.
+ *  - Neither the name of the University Catholique de Louvain - UCL
+ *    nor the names of its contributors may be used to endorse or 
+ *    promote products derived from this software without specific prior 
+ *    written permission.
+ *    
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS 
+ * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT 
+ * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS 
+ * FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE 
+ * COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, 
+ * INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, 
+ * BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; 
+ * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER 
+ * CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT 
+ * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN 
+ * ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE 
+ * POSSIBILITY OF SUCH DAMAGE.
+ */
+
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.util.bloom;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+/**
+ * Implements a <i>counting Bloom filter</i>, as defined by Fan et al. in a ToN
+ * 2000 paper.
+ * <p>
+ * A counting Bloom filter is an improvement to standard a Bloom filter as it
+ * allows dynamic additions and deletions of set membership information.  This 
+ * is achieved through the use of a counting vector instead of a bit vector.
+ * <p>
+ * Originally created by
+ * <a href="http://www.one-lab.org">European Commission One-Lab Project 034819</a>.
+ *
+ * @see Filter The general behavior of a filter
+ * 
+ * @see <a href="http://portal.acm.org/citation.cfm?id=343571.343572">Summary cache: a scalable wide-area web cache sharing protocol</a>
+ */
+public final class CountingBloomFilter extends Filter {
+  /** Storage for the counting buckets */
+  private long[] buckets;
+
+  /** We are using 4bit buckets, so each bucket can count to 15 */
+  private final static long BUCKET_MAX_VALUE = 15;
+
+  /** Default constructor - use with readFields */
+  public CountingBloomFilter() {}
+  
+  /**
+   * Constructor
+   * @param vectorSize The vector size of <i>this</i> filter.
+   * @param nbHash The number of hash function to consider.
+   * @param hashType type of the hashing function (see
+   * {@link org.apache.hadoop.util.hash.Hash}).
+   */
+  public CountingBloomFilter(int vectorSize, int nbHash, int hashType) {
+    super(vectorSize, nbHash, hashType);
+    buckets = new long[buckets2words(vectorSize)];
+  }
+
+  /** returns the number of 64 bit words it would take to hold vectorSize buckets */
+  private static int buckets2words(int vectorSize) {
+   return ((vectorSize - 1) >>> 4) + 1;
+  }
+
+
+  @Override
+  public void add(Key key) {
+    if(key == null) {
+      throw new NullPointerException("key can not be null");
+    }
+
+    int[] h = hash.hash(key);
+    hash.clear();
+
+    for(int i = 0; i < nbHash; i++) {
+      // find the bucket
+      int wordNum = h[i] >> 4;          // div 16
+      int bucketShift = (h[i] & 0x0f) << 2;  // (mod 16) * 4
+      
+      long bucketMask = 15L << bucketShift;
+      long bucketValue = (buckets[wordNum] & bucketMask) >>> bucketShift;
+      
+      // only increment if the count in the bucket is less than BUCKET_MAX_VALUE
+      if(bucketValue < BUCKET_MAX_VALUE) {
+        // increment by 1
+        buckets[wordNum] = (buckets[wordNum] & ~bucketMask) | ((bucketValue + 1) << bucketShift);
+      }
+    }
+  }
+
+  /**
+   * Removes a specified key from <i>this</i> counting Bloom filter.
+   * <p>
+   * <b>Invariant</b>: nothing happens if the specified key does not belong to <i>this</i> counter Bloom filter.
+   * @param key The key to remove.
+   */
+  public void delete(Key key) {
+    if(key == null) {
+      throw new NullPointerException("Key may not be null");
+    }
+    if(!membershipTest(key)) {
+      throw new IllegalArgumentException("Key is not a member");
+    }
+
+    int[] h = hash.hash(key);
+    hash.clear();
+
+    for(int i = 0; i < nbHash; i++) {
+      // find the bucket
+      int wordNum = h[i] >> 4;          // div 16
+      int bucketShift = (h[i] & 0x0f) << 2;  // (mod 16) * 4
+      
+      long bucketMask = 15L << bucketShift;
+      long bucketValue = (buckets[wordNum] & bucketMask) >>> bucketShift;
+      
+      // only decrement if the count in the bucket is between 0 and BUCKET_MAX_VALUE
+      if(bucketValue >= 1 && bucketValue < BUCKET_MAX_VALUE) {
+        // decrement by 1
+        buckets[wordNum] = (buckets[wordNum] & ~bucketMask) | ((bucketValue - 1) << bucketShift);
+      }
+    }
+  }
+
+  @Override
+  public void and(Filter filter) {
+    if(filter == null
+        || !(filter instanceof CountingBloomFilter)
+        || filter.vectorSize != this.vectorSize
+        || filter.nbHash != this.nbHash) {
+      throw new IllegalArgumentException("filters cannot be and-ed");
+    }
+    CountingBloomFilter cbf = (CountingBloomFilter)filter;
+    
+    int sizeInWords = buckets2words(vectorSize);
+    for(int i = 0; i < sizeInWords; i++) {
+      this.buckets[i] &= cbf.buckets[i];
+    }
+  }
+
+  @Override
+  public boolean membershipTest(Key key) {
+    if(key == null) {
+      throw new NullPointerException("Key may not be null");
+    }
+
+    int[] h = hash.hash(key);
+    hash.clear();
+
+    for(int i = 0; i < nbHash; i++) {
+      // find the bucket
+      int wordNum = h[i] >> 4;          // div 16
+      int bucketShift = (h[i] & 0x0f) << 2;  // (mod 16) * 4
+
+      long bucketMask = 15L << bucketShift;
+
+      if((buckets[wordNum] & bucketMask) == 0) {
+        return false;
+      }
+    }
+
+    return true;
+  }
+
+  /**
+   * This method calculates an approximate count of the key, i.e. how many
+   * times the key was added to the filter. This allows the filter to be
+   * used as an approximate <code>key -&gt; count</code> map.
+   * <p>NOTE: due to the bucket size of this filter, inserting the same
+   * key more than 15 times will cause an overflow at all filter positions
+   * associated with this key, and it will significantly increase the error
+   * rate for this and other keys. For this reason the filter can only be
+   * used to store small count values <code>0 &lt;= N &lt;&lt; 15</code>.
+   * @param key key to be tested
+   * @return 0 if the key is not present. Otherwise, a positive value v will
+   * be returned such that <code>v == count</code> with probability equal to the
+   * error rate of this filter, and <code>v &gt; count</code> otherwise.
+   * Additionally, if the filter experienced an underflow as a result of
+   * {@link #delete(Key)} operation, the return value may be lower than the
+   * <code>count</code> with the probability of the false negative rate of such
+   * filter.
+   */
+  public int approximateCount(Key key) {
+    int res = Integer.MAX_VALUE;
+    int[] h = hash.hash(key);
+    hash.clear();
+    for (int i = 0; i < nbHash; i++) {
+      // find the bucket
+      int wordNum = h[i] >> 4;          // div 16
+      int bucketShift = (h[i] & 0x0f) << 2;  // (mod 16) * 4
+      
+      long bucketMask = 15L << bucketShift;
+      long bucketValue = (buckets[wordNum] & bucketMask) >>> bucketShift;
+      if (bucketValue < res) res = (int)bucketValue;
+    }
+    if (res != Integer.MAX_VALUE) {
+      return res;
+    } else {
+      return 0;
+    }
+  }
+
+  @Override
+  public void not() {
+    throw new UnsupportedOperationException("not() is undefined for "
+        + this.getClass().getName());
+  }
+
+  @Override
+  public void or(Filter filter) {
+    if(filter == null
+        || !(filter instanceof CountingBloomFilter)
+        || filter.vectorSize != this.vectorSize
+        || filter.nbHash != this.nbHash) {
+      throw new IllegalArgumentException("filters cannot be or-ed");
+    }
+
+    CountingBloomFilter cbf = (CountingBloomFilter)filter;
+
+    int sizeInWords = buckets2words(vectorSize);
+    for(int i = 0; i < sizeInWords; i++) {
+      this.buckets[i] |= cbf.buckets[i];
+    }
+  }
+
+  @Override
+  public void xor(Filter filter) {
+    throw new UnsupportedOperationException("xor() is undefined for "
+        + this.getClass().getName());
+  }
+
+  @Override
+  public String toString() {
+    StringBuilder res = new StringBuilder();
+
+    for(int i = 0; i < vectorSize; i++) {
+      if(i > 0) {
+        res.append(" ");
+      }
+      
+      int wordNum = i >> 4;          // div 16
+      int bucketShift = (i & 0x0f) << 2;  // (mod 16) * 4
+      
+      long bucketMask = 15L << bucketShift;
+      long bucketValue = (buckets[wordNum] & bucketMask) >>> bucketShift;
+      
+      res.append(bucketValue);
+    }
+
+    return res.toString();
+  }
+
+  // Writable
+
+  @Override
+  public void write(DataOutput out) throws IOException {
+    super.write(out);
+    int sizeInWords = buckets2words(vectorSize);
+    for(int i = 0; i < sizeInWords; i++) {
+      out.writeLong(buckets[i]);
+    }
+  }
+
+  @Override
+  public void readFields(DataInput in) throws IOException {
+    super.readFields(in);
+    int sizeInWords = buckets2words(vectorSize);
+    buckets = new long[sizeInWords];
+    for(int i = 0; i < sizeInWords; i++) {
+      buckets[i] = in.readLong();
+    }
+  }
+}

+ 293 - 0
src/core/org/apache/hadoop/util/bloom/DynamicBloomFilter.java

@@ -0,0 +1,293 @@
+/**
+ *
+ * Copyright (c) 2005, European Commission project OneLab under contract 034819 (http://www.one-lab.org)
+ * All rights reserved.
+ * Redistribution and use in source and binary forms, with or 
+ * without modification, are permitted provided that the following 
+ * conditions are met:
+ *  - Redistributions of source code must retain the above copyright 
+ *    notice, this list of conditions and the following disclaimer.
+ *  - Redistributions in binary form must reproduce the above copyright 
+ *    notice, this list of conditions and the following disclaimer in 
+ *    the documentation and/or other materials provided with the distribution.
+ *  - Neither the name of the University Catholique de Louvain - UCL
+ *    nor the names of its contributors may be used to endorse or 
+ *    promote products derived from this software without specific prior 
+ *    written permission.
+ *    
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS 
+ * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT 
+ * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS 
+ * FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE 
+ * COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, 
+ * INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, 
+ * BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; 
+ * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER 
+ * CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT 
+ * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN 
+ * ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE 
+ * POSSIBILITY OF SUCH DAMAGE.
+ */
+
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.util.bloom;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+/**
+ * Implements a <i>dynamic Bloom filter</i>, as defined in the INFOCOM 2006 paper.
+ * <p>
+ * A dynamic Bloom filter (DBF) makes use of a <code>s * m</code> bit matrix but
+ * each of the <code>s</code> rows is a standard Bloom filter. The creation 
+ * process of a DBF is iterative. At the start, the DBF is a <code>1 * m</code>
+ * bit matrix, i.e., it is composed of a single standard Bloom filter.
+ * It assumes that <code>n<sub>r</sub></code> elements are recorded in the 
+ * initial bit vector, where <code>n<sub>r</sub> <= n</code> (<code>n</code> is
+ * the cardinality of the set <code>A</code> to record in the filter).  
+ * <p>
+ * As the size of <code>A</code> grows during the execution of the application,
+ * several keys must be inserted in the DBF.  When inserting a key into the DBF,
+ * one must first get an active Bloom filter in the matrix.  A Bloom filter is
+ * active when the number of recorded keys, <code>n<sub>r</sub></code>, is 
+ * strictly less than the current cardinality of <code>A</code>, <code>n</code>.
+ * If an active Bloom filter is found, the key is inserted and 
+ * <code>n<sub>r</sub></code> is incremented by one. On the other hand, if there
+ * is no active Bloom filter, a new one is created (i.e., a new row is added to
+ * the matrix) according to the current size of <code>A</code> and the element
+ * is added in this new Bloom filter and the <code>n<sub>r</sub></code> value of
+ * this new Bloom filter is set to one.  A given key is said to belong to the
+ * DBF if the <code>k</code> positions are set to one in one of the matrix rows.
+ * <p>
+ * Originally created by
+ * <a href="http://www.one-lab.org">European Commission One-Lab Project 034819</a>.
+ *
+ * @see Filter The general behavior of a filter
+ * @see BloomFilter A Bloom filter
+ * 
+ * @see <a href="http://www.cse.fau.edu/~jie/research/publications/Publication_files/infocom2006.pdf">Theory and Network Applications of Dynamic Bloom Filters</a>
+ */
+public class DynamicBloomFilter extends Filter {
+  /** 
+   * Threshold for the maximum number of key to record in a dynamic Bloom filter row.
+   */
+  private int nr;
+
+  /**
+   * The number of keys recorded in the current standard active Bloom filter.
+   */
+  private int currentNbRecord;
+
+  /**
+   * The matrix of Bloom filter.
+   */
+  private BloomFilter[] matrix;
+
+  /**
+   * Zero-args constructor for the serialization.
+   */
+  public DynamicBloomFilter() { }
+
+  /**
+   * Constructor.
+   * <p>
+   * Builds an empty Dynamic Bloom filter.
+   * @param vectorSize The number of bits in the vector.
+   * @param nbHash The number of hash function to consider.
+   * @param hashType type of the hashing function (see
+   * {@link org.apache.hadoop.util.hash.Hash}).
+   * @param nr The threshold for the maximum number of keys to record in a
+   * dynamic Bloom filter row.
+   */
+  public DynamicBloomFilter(int vectorSize, int nbHash, int hashType, int nr) {
+    super(vectorSize, nbHash, hashType);
+
+    this.nr = nr;
+    this.currentNbRecord = 0;
+
+    matrix = new BloomFilter[1];
+    matrix[0] = new BloomFilter(this.vectorSize, this.nbHash, this.hashType);
+  }
+
+  @Override
+  public void add(Key key) {
+    if (key == null) {
+      throw new NullPointerException("Key can not be null");
+    }
+
+    BloomFilter bf = getActiveStandardBF();
+
+    if (bf == null) {
+      addRow();
+      bf = matrix[matrix.length - 1];
+      currentNbRecord = 0;
+    }
+
+    bf.add(key);
+
+    currentNbRecord++;
+  }
+
+  @Override
+  public void and(Filter filter) {
+    if (filter == null
+        || !(filter instanceof DynamicBloomFilter)
+        || filter.vectorSize != this.vectorSize
+        || filter.nbHash != this.nbHash) {
+      throw new IllegalArgumentException("filters cannot be and-ed");
+    }
+
+    DynamicBloomFilter dbf = (DynamicBloomFilter)filter;
+
+    if (dbf.matrix.length != this.matrix.length || dbf.nr != this.nr) {
+      throw new IllegalArgumentException("filters cannot be and-ed");
+    }
+
+    for (int i = 0; i < matrix.length; i++) {
+      matrix[i].and(dbf.matrix[i]);
+    }
+  }
+
+  @Override
+  public boolean membershipTest(Key key) {
+    if (key == null) {
+      return true;
+    }
+
+    for (int i = 0; i < matrix.length; i++) {
+      if (matrix[i].membershipTest(key)) {
+        return true;
+      }
+    }
+
+    return false;
+  }
+
+  @Override
+  public void not() {
+    for (int i = 0; i < matrix.length; i++) {
+      matrix[i].not();
+    }
+  }
+
+  @Override
+  public void or(Filter filter) {
+    if (filter == null
+        || !(filter instanceof DynamicBloomFilter)
+        || filter.vectorSize != this.vectorSize
+        || filter.nbHash != this.nbHash) {
+      throw new IllegalArgumentException("filters cannot be or-ed");
+    }
+
+    DynamicBloomFilter dbf = (DynamicBloomFilter)filter;
+
+    if (dbf.matrix.length != this.matrix.length || dbf.nr != this.nr) {
+      throw new IllegalArgumentException("filters cannot be or-ed");
+    }
+    for (int i = 0; i < matrix.length; i++) {
+      matrix[i].or(dbf.matrix[i]);
+    }
+  }
+
+  @Override
+  public void xor(Filter filter) {
+    if (filter == null
+        || !(filter instanceof DynamicBloomFilter)
+        || filter.vectorSize != this.vectorSize
+        || filter.nbHash != this.nbHash) {
+      throw new IllegalArgumentException("filters cannot be xor-ed");
+    }
+    DynamicBloomFilter dbf = (DynamicBloomFilter)filter;
+
+    if (dbf.matrix.length != this.matrix.length || dbf.nr != this.nr) {
+      throw new IllegalArgumentException("filters cannot be xor-ed");
+    }
+
+    for(int i = 0; i<matrix.length; i++) {
+        matrix[i].xor(dbf.matrix[i]);
+    }
+  }
+
+  @Override
+  public String toString() {
+    StringBuilder res = new StringBuilder();
+
+    for (int i = 0; i < matrix.length; i++) {
+      res.append(matrix[i]);
+      res.append(Character.LINE_SEPARATOR);
+    }
+    return res.toString();
+  }
+
+  // Writable
+
+  @Override
+  public void write(DataOutput out) throws IOException {
+    super.write(out);
+    out.writeInt(nr);
+    out.writeInt(currentNbRecord);
+    out.writeInt(matrix.length);
+    for (int i = 0; i < matrix.length; i++) {
+      matrix[i].write(out);
+    }
+  }
+
+  @Override
+  public void readFields(DataInput in) throws IOException {
+    super.readFields(in);
+    nr = in.readInt();
+    currentNbRecord = in.readInt();
+    int len = in.readInt();
+    matrix = new BloomFilter[len];
+    for (int i = 0; i < matrix.length; i++) {
+      matrix[i] = new BloomFilter();
+      matrix[i].readFields(in);
+    }
+  }
+
+  /**
+   * Adds a new row to <i>this</i> dynamic Bloom filter.
+   */
+  private void addRow() {
+    BloomFilter[] tmp = new BloomFilter[matrix.length + 1];
+
+    for (int i = 0; i < matrix.length; i++) {
+      tmp[i] = matrix[i];
+    }
+
+    tmp[tmp.length-1] = new BloomFilter(vectorSize, nbHash, hashType);
+
+    matrix = tmp;
+  }
+
+  /**
+   * Returns the active standard Bloom filter in <i>this</i> dynamic Bloom filter.
+   * @return BloomFilter The active standard Bloom filter.
+   * 			 <code>Null</code> otherwise.
+   */
+  private BloomFilter getActiveStandardBF() {
+    if (currentNbRecord >= nr) {
+      return null;
+    }
+
+    return matrix[matrix.length - 1];
+  }
+}

+ 213 - 0
src/core/org/apache/hadoop/util/bloom/Filter.java

@@ -0,0 +1,213 @@
+/**
+ *
+ * Copyright (c) 2005, European Commission project OneLab under contract 034819
+ * (http://www.one-lab.org)
+ * 
+ * All rights reserved.
+ * Redistribution and use in source and binary forms, with or 
+ * without modification, are permitted provided that the following 
+ * conditions are met:
+ *  - Redistributions of source code must retain the above copyright 
+ *    notice, this list of conditions and the following disclaimer.
+ *  - Redistributions in binary form must reproduce the above copyright 
+ *    notice, this list of conditions and the following disclaimer in 
+ *    the documentation and/or other materials provided with the distribution.
+ *  - Neither the name of the University Catholique de Louvain - UCL
+ *    nor the names of its contributors may be used to endorse or 
+ *    promote products derived from this software without specific prior 
+ *    written permission.
+ *    
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS 
+ * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT 
+ * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS 
+ * FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE 
+ * COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, 
+ * INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, 
+ * BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; 
+ * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER 
+ * CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT 
+ * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN 
+ * ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE 
+ * POSSIBILITY OF SUCH DAMAGE.
+ */
+
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.util.bloom;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.Collection;
+import java.util.List;
+
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.util.hash.Hash;
+
+/**
+ * Defines the general behavior of a filter.
+ * <p>
+ * A filter is a data structure which aims at offering a lossy summary of a set <code>A</code>.  The
+ * key idea is to map entries of <code>A</code> (also called <i>keys</i>) into several positions 
+ * in a vector through the use of several hash functions.
+ * <p>
+ * Typically, a filter will be implemented as a Bloom filter (or a Bloom filter extension).
+ * <p>
+ * It must be extended in order to define the real behavior.
+ * 
+ * @see Key The general behavior of a key
+ * @see HashFunction A hash function
+ */
+public abstract class Filter implements Writable {
+  private static final int VERSION = -1; // negative to accommodate for old format 
+  /** The vector size of <i>this</i> filter. */
+  protected int vectorSize;
+
+  /** The hash function used to map a key to several positions in the vector. */
+  protected HashFunction hash;
+
+  /** The number of hash function to consider. */
+  protected int nbHash;
+  
+  /** Type of hashing function to use. */
+  protected int hashType;
+
+  protected Filter() {}
+  
+  /** 
+   * Constructor.
+   * @param vectorSize The vector size of <i>this</i> filter.
+   * @param nbHash The number of hash functions to consider.
+   * @param hashType type of the hashing function (see {@link Hash}).
+   */
+  protected Filter(int vectorSize, int nbHash, int hashType) {
+    this.vectorSize = vectorSize;
+    this.nbHash = nbHash;
+    this.hashType = hashType;
+    this.hash = new HashFunction(this.vectorSize, this.nbHash, this.hashType);
+  }
+
+  /**
+   * Adds a key to <i>this</i> filter.
+   * @param key The key to add.
+   */
+  public abstract void add(Key key);
+
+  /**
+   * Determines wether a specified key belongs to <i>this</i> filter.
+   * @param key The key to test.
+   * @return boolean True if the specified key belongs to <i>this</i> filter.
+   * 		     False otherwise.
+   */
+  public abstract boolean membershipTest(Key key);
+
+  /**
+   * Peforms a logical AND between <i>this</i> filter and a specified filter.
+   * <p>
+   * <b>Invariant</b>: The result is assigned to <i>this</i> filter.
+   * @param filter The filter to AND with.
+   */
+  public abstract void and(Filter filter);
+
+  /**
+   * Peforms a logical OR between <i>this</i> filter and a specified filter.
+   * <p>
+   * <b>Invariant</b>: The result is assigned to <i>this</i> filter.
+   * @param filter The filter to OR with.
+   */
+  public abstract void or(Filter filter);
+
+  /**
+   * Peforms a logical XOR between <i>this</i> filter and a specified filter.
+   * <p>
+   * <b>Invariant</b>: The result is assigned to <i>this</i> filter.
+   * @param filter The filter to XOR with.
+   */
+  public abstract void xor(Filter filter);
+
+  /**
+   * Performs a logical NOT on <i>this</i> filter.
+   * <p>
+   * The result is assigned to <i>this</i> filter.
+   */
+  public abstract void not();
+
+  /**
+   * Adds a list of keys to <i>this</i> filter.
+   * @param keys The list of keys.
+   */
+  public void add(List<Key> keys){
+    if(keys == null) {
+      throw new IllegalArgumentException("ArrayList<Key> may not be null");
+    }
+
+    for(Key key: keys) {
+      add(key);
+    }
+  }//end add()
+
+  /**
+   * Adds a collection of keys to <i>this</i> filter.
+   * @param keys The collection of keys.
+   */
+  public void add(Collection<Key> keys){
+    if(keys == null) {
+      throw new IllegalArgumentException("Collection<Key> may not be null");
+    }
+    for(Key key: keys) {
+      add(key);
+    }
+  }//end add()
+
+  /**
+   * Adds an array of keys to <i>this</i> filter.
+   * @param keys The array of keys.
+   */
+  public void add(Key[] keys){
+    if(keys == null) {
+      throw new IllegalArgumentException("Key[] may not be null");
+    }
+    for(int i = 0; i < keys.length; i++) {
+      add(keys[i]);
+    }
+  }//end add()
+  
+  // Writable interface
+  
+  public void write(DataOutput out) throws IOException {
+    out.writeInt(VERSION);
+    out.writeInt(this.nbHash);
+    out.writeByte(this.hashType);
+    out.writeInt(this.vectorSize);
+  }
+
+  public void readFields(DataInput in) throws IOException {
+    int ver = in.readInt();
+    if (ver > 0) { // old unversioned format
+      this.nbHash = ver;
+      this.hashType = Hash.JENKINS_HASH;
+    } else if (ver == VERSION) {
+      this.nbHash = in.readInt();
+      this.hashType = in.readByte();
+    } else {
+      throw new IOException("Unsupported version: " + ver);
+    }
+    this.vectorSize = in.readInt();
+    this.hash = new HashFunction(this.vectorSize, this.nbHash, this.hashType);
+  }
+}//end class

+ 118 - 0
src/core/org/apache/hadoop/util/bloom/HashFunction.java

@@ -0,0 +1,118 @@
+/**
+ *
+ * Copyright (c) 2005, European Commission project OneLab under contract 034819 
+ * (http://www.one-lab.org)
+ * 
+ * All rights reserved.
+ * Redistribution and use in source and binary forms, with or 
+ * without modification, are permitted provided that the following 
+ * conditions are met:
+ *  - Redistributions of source code must retain the above copyright 
+ *    notice, this list of conditions and the following disclaimer.
+ *  - Redistributions in binary form must reproduce the above copyright 
+ *    notice, this list of conditions and the following disclaimer in 
+ *    the documentation and/or other materials provided with the distribution.
+ *  - Neither the name of the University Catholique de Louvain - UCL
+ *    nor the names of its contributors may be used to endorse or 
+ *    promote products derived from this software without specific prior 
+ *    written permission.
+ *    
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS 
+ * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT 
+ * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS 
+ * FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE 
+ * COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, 
+ * INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, 
+ * BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; 
+ * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER 
+ * CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT 
+ * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN 
+ * ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE 
+ * POSSIBILITY OF SUCH DAMAGE.
+ */
+
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.util.bloom;
+
+import org.apache.hadoop.util.hash.Hash;
+
+/**
+ * Implements a hash object that returns a certain number of hashed values.
+ * 
+ * @see Key The general behavior of a key being stored in a filter
+ * @see Filter The general behavior of a filter
+ */
+public final class HashFunction {
+  /** The number of hashed values. */
+  private int nbHash;
+
+  /** The maximum highest returned value. */
+  private int maxValue;
+
+  /** Hashing algorithm to use. */
+  private Hash hashFunction;
+  
+  /**
+   * Constructor.
+   * <p>
+   * Builds a hash function that must obey to a given maximum number of returned values and a highest value.
+   * @param maxValue The maximum highest returned value.
+   * @param nbHash The number of resulting hashed values.
+   * @param hashType type of the hashing function (see {@link Hash}).
+   */
+  public HashFunction(int maxValue, int nbHash, int hashType) {
+    if (maxValue <= 0) {
+      throw new IllegalArgumentException("maxValue must be > 0");
+    }
+    
+    if (nbHash <= 0) {
+      throw new IllegalArgumentException("nbHash must be > 0");
+    }
+
+    this.maxValue = maxValue;
+    this.nbHash = nbHash;
+    this.hashFunction = Hash.getInstance(hashType);
+    if (this.hashFunction == null)
+      throw new IllegalArgumentException("hashType must be known");
+  }
+
+  /** Clears <i>this</i> hash function. A NOOP */
+  public void clear() {
+  }
+
+  /**
+   * Hashes a specified key into several integers.
+   * @param k The specified key.
+   * @return The array of hashed values.
+   */
+  public int[] hash(Key k){
+      byte[] b = k.getBytes();
+      if (b == null) {
+        throw new NullPointerException("buffer reference is null");
+      }
+      if (b.length == 0) {
+        throw new IllegalArgumentException("key length must be > 0");
+      }
+      int[] result = new int[nbHash];
+      for (int i = 0, initval = 0; i < nbHash; i++) {
+        initval = result[i] = Math.abs(hashFunction.hash(b, initval) % maxValue);
+      }
+      return result;
+  }
+}

+ 178 - 0
src/core/org/apache/hadoop/util/bloom/Key.java

@@ -0,0 +1,178 @@
+/**
+ *
+ * Copyright (c) 2005, European Commission project OneLab under contract 034819 (http://www.one-lab.org)
+ * All rights reserved.
+ * Redistribution and use in source and binary forms, with or 
+ * without modification, are permitted provided that the following 
+ * conditions are met:
+ *  - Redistributions of source code must retain the above copyright 
+ *    notice, this list of conditions and the following disclaimer.
+ *  - Redistributions in binary form must reproduce the above copyright 
+ *    notice, this list of conditions and the following disclaimer in 
+ *    the documentation and/or other materials provided with the distribution.
+ *  - Neither the name of the University Catholique de Louvain - UCL
+ *    nor the names of its contributors may be used to endorse or 
+ *    promote products derived from this software without specific prior 
+ *    written permission.
+ *    
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS 
+ * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT 
+ * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS 
+ * FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE 
+ * COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, 
+ * INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, 
+ * BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; 
+ * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER 
+ * CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT 
+ * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN 
+ * ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE 
+ * POSSIBILITY OF SUCH DAMAGE.
+ */
+
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.util.bloom;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+import org.apache.hadoop.io.WritableComparable;
+
+/**
+ * The general behavior of a key that must be stored in a filter.
+ * 
+ * @see Filter The general behavior of a filter
+ */
+public class Key implements WritableComparable<Key> {
+  /** Byte value of key */
+  byte[] bytes;
+  
+  /**
+   * The weight associated to <i>this</i> key.
+   * <p>
+   * <b>Invariant</b>: if it is not specified, each instance of 
+   * <code>Key</code> will have a default weight of 1.0
+   */
+  double weight;
+
+  /** default constructor - use with readFields */
+  public Key() {}
+
+  /**
+   * Constructor.
+   * <p>
+   * Builds a key with a default weight.
+   * @param value The byte value of <i>this</i> key.
+   */
+  public Key(byte[] value) {
+    this(value, 1.0);
+  }
+
+  /**
+   * Constructor.
+   * <p>
+   * Builds a key with a specified weight.
+   * @param value The value of <i>this</i> key.
+   * @param weight The weight associated to <i>this</i> key.
+   */
+  public Key(byte[] value, double weight) {
+    set(value, weight);
+  }
+
+  /**
+   * @param value
+   * @param weight
+   */
+  public void set(byte[] value, double weight) {
+    if (value == null) {
+      throw new IllegalArgumentException("value can not be null");
+    }
+    this.bytes = value;
+    this.weight = weight;
+  }
+  
+  /** @return byte[] The value of <i>this</i> key. */
+  public byte[] getBytes() {
+    return this.bytes;
+  }
+
+  /** @return Returns the weight associated to <i>this</i> key. */
+  public double getWeight() {
+    return weight;
+  }
+
+  /**
+   * Increments the weight of <i>this</i> key with a specified value. 
+   * @param weight The increment.
+   */
+  public void incrementWeight(double weight) {
+    this.weight += weight;
+  }
+
+  /** Increments the weight of <i>this</i> key by one. */
+  public void incrementWeight() {
+    this.weight++;
+  }
+
+  @Override
+  public boolean equals(Object o) {
+    if (!(o instanceof Key)) {
+      return false;
+    }
+    return this.compareTo((Key)o) == 0;
+  }
+  
+  @Override
+  public int hashCode() {
+    int result = 0;
+    for (int i = 0; i < bytes.length; i++) {
+      result ^= Byte.valueOf(bytes[i]).hashCode();
+    }
+    result ^= Double.valueOf(weight).hashCode();
+    return result;
+  }
+
+  // Writable
+
+  public void write(DataOutput out) throws IOException {
+    out.writeInt(bytes.length);
+    out.write(bytes);
+    out.writeDouble(weight);
+  }
+  
+  public void readFields(DataInput in) throws IOException {
+    this.bytes = new byte[in.readInt()];
+    in.readFully(this.bytes);
+    weight = in.readDouble();
+  }
+  
+  // Comparable
+  
+  public int compareTo(Key other) {
+    int result = this.bytes.length - other.getBytes().length;
+    for (int i = 0; result == 0 && i < bytes.length; i++) {
+      result = this.bytes[i] - other.bytes[i];
+    }
+    
+    if (result == 0) {
+      result = Double.valueOf(this.weight - other.weight).intValue();
+    }
+    return result;
+  }
+}

+ 91 - 0
src/core/org/apache/hadoop/util/bloom/RemoveScheme.java

@@ -0,0 +1,91 @@
+/**
+ *
+ * Copyright (c) 2005, European Commission project OneLab under contract 034819
+ * (http://www.one-lab.org)
+ * 
+ * All rights reserved.
+ * Redistribution and use in source and binary forms, with or 
+ * without modification, are permitted provided that the following 
+ * conditions are met:
+ *  - Redistributions of source code must retain the above copyright 
+ *    notice, this list of conditions and the following disclaimer.
+ *  - Redistributions in binary form must reproduce the above copyright 
+ *    notice, this list of conditions and the following disclaimer in 
+ *    the documentation and/or other materials provided with the distribution.
+ *  - Neither the name of the University Catholique de Louvain - UCL
+ *    nor the names of its contributors may be used to endorse or 
+ *    promote products derived from this software without specific prior 
+ *    written permission.
+ *    
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS 
+ * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT 
+ * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS 
+ * FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE 
+ * COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, 
+ * INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, 
+ * BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; 
+ * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER 
+ * CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT 
+ * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN 
+ * ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE 
+ * POSSIBILITY OF SUCH DAMAGE.
+ */
+
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.util.bloom;
+
+/**
+ * Defines the different remove scheme for retouched Bloom filters.
+ * <p>
+ * Originally created by
+ * <a href="http://www.one-lab.org">European Commission One-Lab Project 034819</a>.
+ */
+public interface RemoveScheme {
+  /**
+   * Random selection.
+   * <p>
+   * The idea is to randomly select a bit to reset.
+   */
+  public final static short RANDOM = 0;
+
+  /**
+   * MinimumFN Selection.
+   * <p>
+   * The idea is to select the bit to reset that will generate the minimum
+   * number of false negative.
+   */
+  public final static short MINIMUM_FN = 1;
+
+  /**
+   * MaximumFP Selection.
+   * <p>
+   * The idea is to select the bit to reset that will remove the maximum number
+   * of false positive.
+   */
+  public final static short MAXIMUM_FP = 2;
+
+  /**
+   * Ratio Selection.
+   * <p>
+   * The idea is to select the bit to reset that will, at the same time, remove
+   * the maximum number of false positve while minimizing the amount of false
+   * negative generated.
+   */
+  public final static short RATIO = 3;
+}

+ 450 - 0
src/core/org/apache/hadoop/util/bloom/RetouchedBloomFilter.java

@@ -0,0 +1,450 @@
+/**
+ *
+ * Copyright (c) 2005, European Commission project OneLab under contract 034819 (http://www.one-lab.org)
+ * All rights reserved.
+ * Redistribution and use in source and binary forms, with or 
+ * without modification, are permitted provided that the following 
+ * conditions are met:
+ *  - Redistributions of source code must retain the above copyright 
+ *    notice, this list of conditions and the following disclaimer.
+ *  - Redistributions in binary form must reproduce the above copyright 
+ *    notice, this list of conditions and the following disclaimer in 
+ *    the documentation and/or other materials provided with the distribution.
+ *  - Neither the name of the University Catholique de Louvain - UCL
+ *    nor the names of its contributors may be used to endorse or 
+ *    promote products derived from this software without specific prior 
+ *    written permission.
+ *    
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS 
+ * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT 
+ * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS 
+ * FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE 
+ * COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, 
+ * INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, 
+ * BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; 
+ * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER 
+ * CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT 
+ * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN 
+ * ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE 
+ * POSSIBILITY OF SUCH DAMAGE.
+ */
+
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.util.bloom;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+import java.util.Random;
+
+/**
+ * Implements a <i>retouched Bloom filter</i>, as defined in the CoNEXT 2006 paper.
+ * <p>
+ * It allows the removal of selected false positives at the cost of introducing
+ * random false negatives, and with the benefit of eliminating some random false
+ * positives at the same time.
+ * 
+ * <p>
+ * Originally created by
+ * <a href="http://www.one-lab.org">European Commission One-Lab Project 034819</a>.
+ * 
+ * @see Filter The general behavior of a filter
+ * @see BloomFilter A Bloom filter
+ * @see RemoveScheme The different selective clearing algorithms
+ * 
+ * @see <a href="http://www-rp.lip6.fr/site_npa/site_rp/_publications/740-rbf_cameraready.pdf">Retouched Bloom Filters: Allowing Networked Applications to Trade Off Selected False Positives Against False Negatives</a>
+ */
+public final class RetouchedBloomFilter extends BloomFilter
+implements RemoveScheme {
+  /**
+   * KeyList vector (or ElementList Vector, as defined in the paper) of false positives.
+   */
+  List<Key>[] fpVector;
+
+  /**
+   * KeyList vector of keys recorded in the filter.
+   */
+  List<Key>[] keyVector;
+
+  /**
+   * Ratio vector.
+   */
+  double[] ratio;
+  
+  private Random rand;
+
+  /** Default constructor - use with readFields */
+  public RetouchedBloomFilter() {}
+  
+  /**
+   * Constructor
+   * @param vectorSize The vector size of <i>this</i> filter.
+   * @param nbHash The number of hash function to consider.
+   * @param hashType type of the hashing function (see
+   * {@link org.apache.hadoop.util.hash.Hash}).
+   */
+  public RetouchedBloomFilter(int vectorSize, int nbHash, int hashType) {
+    super(vectorSize, nbHash, hashType);
+
+    this.rand = null;
+    createVector();
+  }
+
+  @Override
+  public void add(Key key) {
+    if (key == null) {
+      throw new NullPointerException("key can not be null");
+    }
+
+    int[] h = hash.hash(key);
+    hash.clear();
+
+    for (int i = 0; i < nbHash; i++) {
+      bits.set(h[i]);
+      keyVector[h[i]].add(key);
+    }
+  }
+
+  /**
+   * Adds a false positive information to <i>this</i> retouched Bloom filter.
+   * <p>
+   * <b>Invariant</b>: if the false positive is <code>null</code>, nothing happens.
+   * @param key The false positive key to add.
+   */
+  public void addFalsePositive(Key key) {
+    if (key == null) {
+      throw new NullPointerException("key can not be null");
+    }
+
+    int[] h = hash.hash(key);
+    hash.clear();
+
+    for (int i = 0; i < nbHash; i++) {
+      fpVector[h[i]].add(key);
+    }
+  }
+
+  /**
+   * Adds a collection of false positive information to <i>this</i> retouched Bloom filter.
+   * @param coll The collection of false positive.
+   */
+  public void addFalsePositive(Collection<Key> coll) {
+    if (coll == null) {
+      throw new NullPointerException("Collection<Key> can not be null");
+    }
+    
+    for (Key k : coll) {
+      addFalsePositive(k);
+    }
+  }
+
+  /**
+   * Adds a list of false positive information to <i>this</i> retouched Bloom filter.
+   * @param keys The list of false positive.
+   */
+  public void addFalsePositive(List<Key> keys) {
+    if (keys == null) {
+      throw new NullPointerException("ArrayList<Key> can not be null");
+    }
+
+    for (Key k : keys) {
+      addFalsePositive(k);
+    }
+  }
+
+  /**
+   * Adds an array of false positive information to <i>this</i> retouched Bloom filter.
+   * @param keys The array of false positive.
+   */
+  public void addFalsePositive(Key[] keys) {
+    if (keys == null) {
+      throw new NullPointerException("Key[] can not be null");
+    }
+
+    for (int i = 0; i < keys.length; i++) {
+      addFalsePositive(keys[i]);
+    }
+  }
+
+  /**
+   * Performs the selective clearing for a given key.
+   * @param k The false positive key to remove from <i>this</i> retouched Bloom filter.
+   * @param scheme The selective clearing scheme to apply.
+   */
+  public void selectiveClearing(Key k, short scheme) {
+    if (k == null) {
+      throw new NullPointerException("Key can not be null");
+    }
+
+    if (!membershipTest(k)) {
+      throw new IllegalArgumentException("Key is not a member");
+    }
+
+    int index = 0;
+    int[] h = hash.hash(k);
+
+    switch(scheme) {
+
+    case RANDOM:
+      index = randomRemove();
+      break;
+    
+    case MINIMUM_FN:
+      index = minimumFnRemove(h);
+      break;
+    
+    case MAXIMUM_FP:
+      index = maximumFpRemove(h);
+      break;
+    
+    case RATIO:
+      index = ratioRemove(h);
+      break;
+    
+    default:
+      throw new AssertionError("Undefined selective clearing scheme");
+
+    }
+
+    clearBit(index);
+  }
+
+  private int randomRemove() {
+    if (rand == null) {
+      rand = new Random();
+    }
+
+    return rand.nextInt(nbHash);
+  }
+
+  /**
+   * Chooses the bit position that minimizes the number of false negative generated.
+   * @param h The different bit positions.
+   * @return The position that minimizes the number of false negative generated.
+   */
+  private int minimumFnRemove(int[] h) {
+    int minIndex = Integer.MAX_VALUE;
+    double minValue = Double.MAX_VALUE;
+
+    for (int i = 0; i < nbHash; i++) {
+      double keyWeight = getWeight(keyVector[h[i]]);
+
+      if (keyWeight < minValue) {
+        minIndex = h[i];
+        minValue = keyWeight;
+      }
+
+    }
+
+    return minIndex;
+  }
+
+  /**
+   * Chooses the bit position that maximizes the number of false positive removed.
+   * @param h The different bit positions.
+   * @return The position that maximizes the number of false positive removed.
+   */
+  private int maximumFpRemove(int[] h) {
+    int maxIndex = Integer.MIN_VALUE;
+    double maxValue = Double.MIN_VALUE;
+
+    for (int i = 0; i < nbHash; i++) {
+      double fpWeight = getWeight(fpVector[h[i]]);
+
+      if (fpWeight > maxValue) {
+        maxValue = fpWeight;
+        maxIndex = h[i];
+      }
+    }
+
+    return maxIndex;
+  }
+
+  /**
+   * Chooses the bit position that minimizes the number of false negative generated while maximizing.
+   * the number of false positive removed.
+   * @param h The different bit positions.
+   * @return The position that minimizes the number of false negative generated while maximizing.
+   */
+  private int ratioRemove(int[] h) {
+    computeRatio();
+    int minIndex = Integer.MAX_VALUE;
+    double minValue = Double.MAX_VALUE;
+
+    for (int i = 0; i < nbHash; i++) {
+      if (ratio[h[i]] < minValue) {
+        minValue = ratio[h[i]];
+        minIndex = h[i];
+      }
+    }
+
+    return minIndex;
+  }
+
+  /**
+   * Clears a specified bit in the bit vector and keeps up-to-date the KeyList vectors.
+   * @param index The position of the bit to clear.
+   */
+  private void clearBit(int index) {
+    if (index < 0 || index >= vectorSize) {
+      throw new ArrayIndexOutOfBoundsException(index);
+    }
+
+    List<Key> kl = keyVector[index];
+    List<Key> fpl = fpVector[index];
+
+    // update key list
+    int listSize = kl.size();
+    for (int i = 0; i < listSize && !kl.isEmpty(); i++) {
+      removeKey(kl.get(0), keyVector);
+    }
+
+    kl.clear();
+    keyVector[index].clear();
+
+    //update false positive list
+    listSize = fpl.size();
+    for (int i = 0; i < listSize && !fpl.isEmpty(); i++) {
+      removeKey(fpl.get(0), fpVector);
+    }
+
+    fpl.clear();
+    fpVector[index].clear();
+
+    //update ratio
+    ratio[index] = 0.0;
+
+    //update bit vector
+    bits.clear(index);
+  }
+
+  /**
+   * Removes a given key from <i>this</i> filer.
+   * @param k The key to remove.
+   * @param vector The counting vector associated to the key.
+   */
+  private void removeKey(Key k, List<Key>[] vector) {
+    if (k == null) {
+      throw new NullPointerException("Key can not be null");
+    }
+    if (vector == null) {
+      throw new NullPointerException("ArrayList<Key>[] can not be null");
+    }
+
+    int[] h = hash.hash(k);
+    hash.clear();
+
+    for (int i = 0; i < nbHash; i++) {
+      vector[h[i]].remove(k);
+    }
+  }
+
+  /**
+   * Computes the ratio A/FP.
+   */
+  private void computeRatio() {
+    for (int i = 0; i < vectorSize; i++) {
+      double keyWeight = getWeight(keyVector[i]);
+      double fpWeight = getWeight(fpVector[i]);
+
+      if (keyWeight > 0 && fpWeight > 0) {
+        ratio[i] = keyWeight / fpWeight;
+      }
+    }
+  }
+
+  private double getWeight(List<Key> keyList) {
+    double weight = 0.0;
+    for (Key k : keyList) {
+      weight += k.getWeight();
+    }
+    return weight;
+  }
+  
+  /**
+   * Creates and initialises the various vectors.
+   */
+  @SuppressWarnings("unchecked")
+  private void createVector() {
+    fpVector = new List[vectorSize];
+    keyVector = new List[vectorSize];
+    ratio = new double[vectorSize];
+
+    for (int i = 0; i < vectorSize; i++) {
+      fpVector[i] = Collections.synchronizedList(new ArrayList<Key>());
+      keyVector[i] = Collections.synchronizedList(new ArrayList<Key>());
+      ratio[i] = 0.0;
+    }
+  }
+  
+  // Writable
+
+  @Override
+  public void write(DataOutput out) throws IOException {
+    super.write(out);
+    for (int i = 0; i < fpVector.length; i++) {
+      List<Key> list = fpVector[i];
+      out.writeInt(list.size());
+      for (Key k : list) {
+        k.write(out);
+      }
+    }
+    for (int i = 0; i < keyVector.length; i++) {
+      List<Key> list = keyVector[i];
+      out.writeInt(list.size());
+      for (Key k : list) {
+        k.write(out);
+      }
+    }
+    for (int i = 0; i < ratio.length; i++) {
+      out.writeDouble(ratio[i]);
+    }
+  }
+
+  @Override
+  public void readFields(DataInput in) throws IOException {
+    super.readFields(in);
+    createVector();
+    for (int i = 0; i < fpVector.length; i++) {
+      List<Key> list = fpVector[i];
+      int size = in.readInt();
+      for (int j = 0; j < size; j++) {
+        Key k = new Key();
+        k.readFields(in);
+        list.add(k);
+      }
+    }
+    for (int i = 0; i < keyVector.length; i++) {
+      List<Key> list = keyVector[i];
+      int size = in.readInt();
+      for (int j = 0; j < size; j++) {
+        Key k = new Key();
+        k.readFields(in);
+        list.add(k);
+      }
+    }
+    for (int i = 0; i < ratio.length; i++) {
+      ratio[i] = in.readDouble();
+    }
+  }
+}

+ 119 - 0
src/core/org/apache/hadoop/util/hash/Hash.java

@@ -0,0 +1,119 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.util.hash;
+
+import org.apache.hadoop.conf.Configuration;
+
+/**
+ * This class represents a common API for hashing functions.
+ */
+public abstract class Hash {
+  /** Constant to denote invalid hash type. */
+  public static final int INVALID_HASH = -1;
+  /** Constant to denote {@link JenkinsHash}. */
+  public static final int JENKINS_HASH = 0;
+  /** Constant to denote {@link MurmurHash}. */
+  public static final int MURMUR_HASH  = 1;
+  
+  /**
+   * This utility method converts String representation of hash function name
+   * to a symbolic constant. Currently two function types are supported,
+   * "jenkins" and "murmur".
+   * @param name hash function name
+   * @return one of the predefined constants
+   */
+  public static int parseHashType(String name) {
+    if ("jenkins".equalsIgnoreCase(name)) {
+      return JENKINS_HASH;
+    } else if ("murmur".equalsIgnoreCase(name)) {
+      return MURMUR_HASH;
+    } else {
+      return INVALID_HASH;
+    }
+  }
+  
+  /**
+   * This utility method converts the name of the configured
+   * hash type to a symbolic constant.
+   * @param conf configuration
+   * @return one of the predefined constants
+   */
+  public static int getHashType(Configuration conf) {
+    String name = conf.get("hadoop.util.hash.type", "murmur");
+    return parseHashType(name);
+  }
+  
+  /**
+   * Get a singleton instance of hash function of a given type.
+   * @param type predefined hash type
+   * @return hash function instance, or null if type is invalid
+   */
+  public static Hash getInstance(int type) {
+    switch(type) {
+    case JENKINS_HASH:
+      return JenkinsHash.getInstance();
+    case MURMUR_HASH:
+      return MurmurHash.getInstance();
+    default:
+      return null;
+    }
+  }
+  
+  /**
+   * Get a singleton instance of hash function of a type
+   * defined in the configuration.
+   * @param conf current configuration
+   * @return defined hash type, or null if type is invalid
+   */
+  public static Hash getInstance(Configuration conf) {
+    int type = getHashType(conf);
+    return getInstance(type);
+  }
+  
+  /**
+   * Calculate a hash using all bytes from the input argument, and
+   * a seed of -1.
+   * @param bytes input bytes
+   * @return hash value
+   */
+  public int hash(byte[] bytes) {
+    return hash(bytes, bytes.length, -1);
+  }
+  
+  /**
+   * Calculate a hash using all bytes from the input argument,
+   * and a provided seed value.
+   * @param bytes input bytes
+   * @param initval seed value
+   * @return hash value
+   */
+  public int hash(byte[] bytes, int initval) {
+    return hash(bytes, bytes.length, initval);
+  }
+  
+  /**
+   * Calculate a hash using bytes from 0 to <code>length</code>, and
+   * the provided seed value
+   * @param bytes input bytes
+   * @param length length of the valid bytes to consider
+   * @param initval seed value
+   * @return hash value
+   */
+  public abstract int hash(byte[] bytes, int length, int initval);
+}

+ 260 - 0
src/core/org/apache/hadoop/util/hash/JenkinsHash.java

@@ -0,0 +1,260 @@
+/**
+ * Copyright 2007 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.util.hash;
+
+import java.io.FileInputStream;
+import java.io.IOException;
+
+/**
+ * Produces 32-bit hash for hash table lookup.
+ * 
+ * <pre>lookup3.c, by Bob Jenkins, May 2006, Public Domain.
+ *
+ * You can use this free for any purpose.  It's in the public domain.
+ * It has no warranty.
+ * </pre>
+ * 
+ * @see <a href="http://burtleburtle.net/bob/c/lookup3.c">lookup3.c</a>
+ * @see <a href="http://www.ddj.com/184410284">Hash Functions (and how this
+ * function compares to others such as CRC, MD?, etc</a>
+ * @see <a href="http://burtleburtle.net/bob/hash/doobs.html">Has update on the
+ * Dr. Dobbs Article</a>
+ */
+public class JenkinsHash extends Hash {
+  private static long INT_MASK  = 0x00000000ffffffffL;
+  private static long BYTE_MASK = 0x00000000000000ffL;
+  
+  private static JenkinsHash _instance = new JenkinsHash();
+  
+  public static Hash getInstance() {
+    return _instance;
+  }
+
+  private static long rot(long val, int pos) {
+    return ((Integer.rotateLeft(
+        (int)(val & INT_MASK), pos)) & INT_MASK);
+  }
+
+  /**
+   * taken from  hashlittle() -- hash a variable-length key into a 32-bit value
+   * 
+   * @param key the key (the unaligned variable-length array of bytes)
+   * @param nbytes number of bytes to include in hash
+   * @param initval can be any integer value
+   * @return a 32-bit value.  Every bit of the key affects every bit of the
+   * return value.  Two keys differing by one or two bits will have totally
+   * different hash values.
+   * 
+   * <p>The best hash table sizes are powers of 2.  There is no need to do mod
+   * a prime (mod is sooo slow!).  If you need less than 32 bits, use a bitmask.
+   * For example, if you need only 10 bits, do
+   * <code>h = (h & hashmask(10));</code>
+   * In which case, the hash table should have hashsize(10) elements.
+   * 
+   * <p>If you are hashing n strings byte[][] k, do it like this:
+   * for (int i = 0, h = 0; i < n; ++i) h = hash( k[i], h);
+   * 
+   * <p>By Bob Jenkins, 2006.  bob_jenkins@burtleburtle.net.  You may use this
+   * code any way you wish, private, educational, or commercial.  It's free.
+   * 
+   * <p>Use for hash table lookup, or anything where one collision in 2^^32 is
+   * acceptable.  Do NOT use for cryptographic purposes.
+  */
+  @SuppressWarnings("fallthrough")
+  public int hash(byte[] key, int nbytes, int initval) {
+    int length = nbytes;
+    long a, b, c;       // We use longs because we don't have unsigned ints
+    a = b = c = (0x00000000deadbeefL + length + initval) & INT_MASK;
+    int offset = 0;
+    for (; length > 12; offset += 12, length -= 12) {
+      a = (a + (key[offset + 0]    & BYTE_MASK)) & INT_MASK;
+      a = (a + (((key[offset + 1]  & BYTE_MASK) <<  8) & INT_MASK)) & INT_MASK;
+      a = (a + (((key[offset + 2]  & BYTE_MASK) << 16) & INT_MASK)) & INT_MASK;
+      a = (a + (((key[offset + 3]  & BYTE_MASK) << 24) & INT_MASK)) & INT_MASK;
+      b = (b + (key[offset + 4]    & BYTE_MASK)) & INT_MASK;
+      b = (b + (((key[offset + 5]  & BYTE_MASK) <<  8) & INT_MASK)) & INT_MASK;
+      b = (b + (((key[offset + 6]  & BYTE_MASK) << 16) & INT_MASK)) & INT_MASK;
+      b = (b + (((key[offset + 7]  & BYTE_MASK) << 24) & INT_MASK)) & INT_MASK;
+      c = (c + (key[offset + 8]    & BYTE_MASK)) & INT_MASK;
+      c = (c + (((key[offset + 9]  & BYTE_MASK) <<  8) & INT_MASK)) & INT_MASK;
+      c = (c + (((key[offset + 10] & BYTE_MASK) << 16) & INT_MASK)) & INT_MASK;
+      c = (c + (((key[offset + 11] & BYTE_MASK) << 24) & INT_MASK)) & INT_MASK;
+      
+      /*
+       * mix -- mix 3 32-bit values reversibly.
+       * This is reversible, so any information in (a,b,c) before mix() is
+       * still in (a,b,c) after mix().
+       * 
+       * If four pairs of (a,b,c) inputs are run through mix(), or through
+       * mix() in reverse, there are at least 32 bits of the output that
+       * are sometimes the same for one pair and different for another pair.
+       * 
+       * This was tested for:
+       * - pairs that differed by one bit, by two bits, in any combination
+       *   of top bits of (a,b,c), or in any combination of bottom bits of
+       *   (a,b,c).
+       * - "differ" is defined as +, -, ^, or ~^.  For + and -, I transformed
+       *   the output delta to a Gray code (a^(a>>1)) so a string of 1's (as
+       *    is commonly produced by subtraction) look like a single 1-bit
+       *    difference.
+       * - the base values were pseudorandom, all zero but one bit set, or
+       *   all zero plus a counter that starts at zero.
+       * 
+       * Some k values for my "a-=c; a^=rot(c,k); c+=b;" arrangement that
+       * satisfy this are
+       *     4  6  8 16 19  4
+       *     9 15  3 18 27 15
+       *    14  9  3  7 17  3
+       * Well, "9 15 3 18 27 15" didn't quite get 32 bits diffing for 
+       * "differ" defined as + with a one-bit base and a two-bit delta.  I
+       * used http://burtleburtle.net/bob/hash/avalanche.html to choose
+       * the operations, constants, and arrangements of the variables.
+       * 
+       * This does not achieve avalanche.  There are input bits of (a,b,c)
+       * that fail to affect some output bits of (a,b,c), especially of a.
+       * The most thoroughly mixed value is c, but it doesn't really even
+       * achieve avalanche in c.
+       * 
+       * This allows some parallelism.  Read-after-writes are good at doubling
+       * the number of bits affected, so the goal of mixing pulls in the
+       * opposite direction as the goal of parallelism.  I did what I could.
+       * Rotates seem to cost as much as shifts on every machine I could lay
+       * my hands on, and rotates are much kinder to the top and bottom bits,
+       * so I used rotates.
+       *
+       * #define mix(a,b,c) \
+       * { \
+       *   a -= c;  a ^= rot(c, 4);  c += b; \
+       *   b -= a;  b ^= rot(a, 6);  a += c; \
+       *   c -= b;  c ^= rot(b, 8);  b += a; \
+       *   a -= c;  a ^= rot(c,16);  c += b; \
+       *   b -= a;  b ^= rot(a,19);  a += c; \
+       *   c -= b;  c ^= rot(b, 4);  b += a; \
+       * }
+       * 
+       * mix(a,b,c);
+       */
+      a = (a - c) & INT_MASK;  a ^= rot(c, 4);  c = (c + b) & INT_MASK;
+      b = (b - a) & INT_MASK;  b ^= rot(a, 6);  a = (a + c) & INT_MASK;
+      c = (c - b) & INT_MASK;  c ^= rot(b, 8);  b = (b + a) & INT_MASK;
+      a = (a - c) & INT_MASK;  a ^= rot(c,16);  c = (c + b) & INT_MASK;
+      b = (b - a) & INT_MASK;  b ^= rot(a,19);  a = (a + c) & INT_MASK;
+      c = (c - b) & INT_MASK;  c ^= rot(b, 4);  b = (b + a) & INT_MASK;
+    }
+
+    //-------------------------------- last block: affect all 32 bits of (c)
+    switch (length) {                   // all the case statements fall through
+    case 12:
+      c = (c + (((key[offset + 11] & BYTE_MASK) << 24) & INT_MASK)) & INT_MASK;
+    case 11:
+      c = (c + (((key[offset + 10] & BYTE_MASK) << 16) & INT_MASK)) & INT_MASK;
+    case 10:
+      c = (c + (((key[offset + 9]  & BYTE_MASK) <<  8) & INT_MASK)) & INT_MASK;
+    case  9:
+      c = (c + (key[offset + 8]    & BYTE_MASK)) & INT_MASK;
+    case  8:
+      b = (b + (((key[offset + 7]  & BYTE_MASK) << 24) & INT_MASK)) & INT_MASK;
+    case  7:
+      b = (b + (((key[offset + 6]  & BYTE_MASK) << 16) & INT_MASK)) & INT_MASK;
+    case  6:
+      b = (b + (((key[offset + 5]  & BYTE_MASK) <<  8) & INT_MASK)) & INT_MASK;
+    case  5:
+      b = (b + (key[offset + 4]    & BYTE_MASK)) & INT_MASK;
+    case  4:
+      a = (a + (((key[offset + 3]  & BYTE_MASK) << 24) & INT_MASK)) & INT_MASK;
+    case  3:
+      a = (a + (((key[offset + 2]  & BYTE_MASK) << 16) & INT_MASK)) & INT_MASK;
+    case  2:
+      a = (a + (((key[offset + 1]  & BYTE_MASK) <<  8) & INT_MASK)) & INT_MASK;
+    case  1:
+      a = (a + (key[offset + 0]    & BYTE_MASK)) & INT_MASK;
+      break;
+    case  0:
+      return (int)(c & INT_MASK);
+    }
+    /*
+     * final -- final mixing of 3 32-bit values (a,b,c) into c
+     * 
+     * Pairs of (a,b,c) values differing in only a few bits will usually
+     * produce values of c that look totally different.  This was tested for
+     * - pairs that differed by one bit, by two bits, in any combination
+     *   of top bits of (a,b,c), or in any combination of bottom bits of
+     *   (a,b,c).
+     * 
+     * - "differ" is defined as +, -, ^, or ~^.  For + and -, I transformed
+     *   the output delta to a Gray code (a^(a>>1)) so a string of 1's (as
+     *   is commonly produced by subtraction) look like a single 1-bit
+     *   difference.
+     * 
+     * - the base values were pseudorandom, all zero but one bit set, or
+     *   all zero plus a counter that starts at zero.
+     * 
+     * These constants passed:
+     *   14 11 25 16 4 14 24
+     *   12 14 25 16 4 14 24
+     * and these came close:
+     *    4  8 15 26 3 22 24
+     *   10  8 15 26 3 22 24
+     *   11  8 15 26 3 22 24
+     * 
+     * #define final(a,b,c) \
+     * { 
+     *   c ^= b; c -= rot(b,14); \
+     *   a ^= c; a -= rot(c,11); \
+     *   b ^= a; b -= rot(a,25); \
+     *   c ^= b; c -= rot(b,16); \
+     *   a ^= c; a -= rot(c,4);  \
+     *   b ^= a; b -= rot(a,14); \
+     *   c ^= b; c -= rot(b,24); \
+     * }
+     * 
+     */
+    c ^= b; c = (c - rot(b,14)) & INT_MASK;
+    a ^= c; a = (a - rot(c,11)) & INT_MASK;
+    b ^= a; b = (b - rot(a,25)) & INT_MASK;
+    c ^= b; c = (c - rot(b,16)) & INT_MASK;
+    a ^= c; a = (a - rot(c,4))  & INT_MASK;
+    b ^= a; b = (b - rot(a,14)) & INT_MASK;
+    c ^= b; c = (c - rot(b,24)) & INT_MASK;
+
+    return (int)(c & INT_MASK);
+  }
+  
+  /**
+   * Compute the hash of the specified file
+   * @param args name of file to compute hash of.
+   * @throws IOException
+   */
+  public static void main(String[] args) throws IOException {
+    if (args.length != 1) {
+      System.err.println("Usage: JenkinsHash filename");
+      System.exit(-1);
+    }
+    FileInputStream in = new FileInputStream(args[0]);
+    byte[] bytes = new byte[512];
+    int value = 0;
+    JenkinsHash hash = new JenkinsHash();
+    for (int length = in.read(bytes); length > 0 ; length = in.read(bytes)) {
+      value = hash.hash(bytes, length, value);
+    }
+    System.out.println(Math.abs(value));
+  }
+}

+ 83 - 0
src/core/org/apache/hadoop/util/hash/MurmurHash.java

@@ -0,0 +1,83 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.util.hash;
+
+/**
+ * This is a very fast, non-cryptographic hash suitable for general hash-based
+ * lookup.  See http://murmurhash.googlepages.com/ for more details.
+ * 
+ * <p>The C version of MurmurHash 2.0 found at that site was ported
+ * to Java by Andrzej Bialecki (ab at getopt org).</p>
+ */
+public class MurmurHash extends Hash {
+  private static MurmurHash _instance = new MurmurHash();
+  
+  public static Hash getInstance() {
+    return _instance;
+  }
+  
+  public int hash(byte[] data, int length, int seed) {
+    int m = 0x5bd1e995;
+    int r = 24;
+
+    int h = seed ^ length;
+
+    int len_4 = length >> 2;
+
+    for (int i = 0; i < len_4; i++) {
+      int i_4 = i << 2;
+      int k = data[i_4 + 3];
+      k = k << 8;
+      k = k | (data[i_4 + 2] & 0xff);
+      k = k << 8;
+      k = k | (data[i_4 + 1] & 0xff);
+      k = k << 8;
+      k = k | (data[i_4 + 0] & 0xff);
+      k *= m;
+      k ^= k >>> r;
+      k *= m;
+      h *= m;
+      h ^= k;
+    }
+
+    // avoid calculating modulo
+    int len_m = len_4 << 2;
+    int left = length - len_m;
+
+    if (left != 0) {
+      if (left >= 3) {
+        h ^= (int) data[length - 3] << 16;
+      }
+      if (left >= 2) {
+        h ^= (int) data[length - 2] << 8;
+      }
+      if (left >= 1) {
+        h ^= (int) data[length - 1];
+      }
+
+      h *= m;
+    }
+
+    h ^= h >>> 13;
+    h *= m;
+    h ^= h >>> 15;
+
+    return h;
+  }
+}

+ 70 - 0
src/test/org/apache/hadoop/io/TestBloomMapFile.java

@@ -0,0 +1,70 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.io;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+
+import junit.framework.TestCase;
+
+public class TestBloomMapFile extends TestCase {
+  private static Configuration conf = new Configuration();
+  
+  public void testMembershipTest() throws Exception {
+    // write the file
+    Path dirName = new Path(System.getProperty("test.build.data",".") +
+        getName() + ".bloommapfile"); 
+    FileSystem fs = FileSystem.getLocal(conf);
+    Path qualifiedDirName = fs.makeQualified(dirName);
+    conf.setInt("io.mapfile.bloom.size", 2048);
+    BloomMapFile.Writer writer = new BloomMapFile.Writer(conf, fs,
+      qualifiedDirName.toString(), IntWritable.class, Text.class);
+    IntWritable key = new IntWritable();
+    Text value = new Text();
+    for (int i = 0; i < 2000; i += 2) {
+      key.set(i);
+      value.set("00" + i);
+      writer.append(key, value);
+    }
+    writer.close();
+    
+    BloomMapFile.Reader reader = new BloomMapFile.Reader(fs,
+        qualifiedDirName.toString(), conf);
+    // check false positives rate
+    int falsePos = 0;
+    int falseNeg = 0;
+    for (int i = 0; i < 2000; i++) {
+      key.set(i);
+      boolean exists = reader.probablyHasKey(key);
+      if (i % 2 == 0) {
+        if (!exists) falseNeg++;
+      } else {
+        if (exists) falsePos++;
+      }
+    }
+    reader.close();
+    fs.delete(qualifiedDirName, true);
+    System.out.println("False negatives: " + falseNeg);
+    assertEquals(0, falseNeg);
+    System.out.println("False positives: " + falsePos);
+    assertTrue(falsePos < 2);
+  }
+
+}