Browse Source

HADOOP-1606 Updated implementation of RowFilterSet, RowFilterInterface

git-svn-id: https://svn.apache.org/repos/asf/lucene/hadoop/trunk@558243 13f79535-47bb-0310-9956-ffa450edef68
Michael Stack 18 years ago
parent
commit
1418653e33

+ 2 - 0
src/contrib/hbase/CHANGES.txt

@@ -67,3 +67,5 @@ Trunk (unreleased changes)
  43. HADOOP-1616 Sporadic TestTable failures
  44. HADOOP-1615 Replacing thread notification-based queue with 
      java.util.concurrent.BlockingQueue in HMaster, HRegionServer
+ 45. HADOOP-1606 Updated implementation of RowFilterSet, RowFilterInterface
+     (Izaak Rubin via Stack)

+ 2 - 2
src/contrib/hbase/src/java/org/apache/hadoop/hbase/HRegion.java

@@ -1485,8 +1485,8 @@ public class HRegion implements HConstants {
         moreToFollow = chosenTimestamp > 0;
         
         if (dataFilter != null) {
-          if (moreToFollow && !filtered) {
-            dataFilter.acceptedRow(chosenRow);
+          if (moreToFollow) {
+            dataFilter.rowProcessed(filtered, chosenRow);
           }
           if (dataFilter.filterAllRemaining()) {
             moreToFollow = false;

+ 25 - 5
src/contrib/hbase/src/java/org/apache/hadoop/hbase/filter/PageRowFilter.java

@@ -24,6 +24,8 @@ import java.io.DataOutput;
 import java.io.IOException;
 import java.util.TreeMap;
 
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.io.Text;
 
 /**
@@ -44,6 +46,8 @@ public class PageRowFilter implements RowFilterInterface {
   private long pageSize = Long.MAX_VALUE;
   private int rowsAccepted = 0;
 
+  static final Log LOG = LogFactory.getLog(PageRowFilter.class);
+  
   /**
    * Default constructor, filters nothing. Required though for RPC
    * deserialization.
@@ -81,8 +85,22 @@ public class PageRowFilter implements RowFilterInterface {
    * 
    * {@inheritDoc}
    */
-  public void acceptedRow(@SuppressWarnings("unused") final Text key) {
-    rowsAccepted++;
+  public void rowProcessed(boolean filtered, Text rowKey) {
+    if (!filtered) {
+      this.rowsAccepted++;
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("rowProcessed incremented rowsAccepted to " + 
+          this.rowsAccepted);
+      }
+    }
+  }
+
+  /**
+   * 
+   * {@inheritDoc}
+   */
+  public boolean processAlways() {
+    return false;
   }
 
   /**
@@ -90,10 +108,12 @@ public class PageRowFilter implements RowFilterInterface {
    * {@inheritDoc}
    */
   public boolean filterAllRemaining() {
-    if (this.rowsAccepted > this.pageSize) {
-      return true;
+    boolean result = this.rowsAccepted > this.pageSize;
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("filtering decision is " + result + " with rowsAccepted: " + 
+        this.rowsAccepted);
     }
-    return false;
+    return result;
   }
 
   /**

+ 46 - 6
src/contrib/hbase/src/java/org/apache/hadoop/hbase/filter/RegExpRowFilter.java

@@ -31,6 +31,8 @@ import java.util.TreeMap;
 import java.util.Map.Entry;
 import java.util.regex.Pattern;
 
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.io.Text;
 
@@ -47,6 +49,8 @@ public class RegExpRowFilter implements RowFilterInterface {
   private Map<Text, byte[]> equalsMap = new HashMap<Text, byte[]>();
   private Set<Text> nullColumns = new HashSet<Text>();
 
+  static final Log LOG = LogFactory.getLog(RegExpRowFilter.class);
+  
   /**
    * Default constructor, filters nothing. Required though for RPC
    * deserialization.
@@ -80,10 +84,18 @@ public class RegExpRowFilter implements RowFilterInterface {
    * 
    * {@inheritDoc}
    */
-  public void acceptedRow(@SuppressWarnings("unused") final Text key) {
+  public void rowProcessed(boolean filtered, Text rowKey) {
     //doesn't care
   }
 
+  /**
+   * 
+   * {@inheritDoc}
+   */
+  public boolean processAlways() {
+    return false;
+  }
+  
   /**
    * Specify a value that must be matched for the given column.
    * 
@@ -93,7 +105,7 @@ public class RegExpRowFilter implements RowFilterInterface {
    *          the value that must equal the stored value.
    */
   public void setColumnFilter(final Text colKey, final byte[] value) {
-    if (null == value) {
+    if (value == null) {
       nullColumns.add(colKey);
     } else {
       equalsMap.put(colKey, value);
@@ -139,7 +151,11 @@ public class RegExpRowFilter implements RowFilterInterface {
    */
   public boolean filter(final Text rowKey) {
     if (filtersByRowKey() && rowKey != null) {
-      return !getRowKeyPattern().matcher(rowKey.toString()).matches();
+      boolean result = !getRowKeyPattern().matcher(rowKey.toString()).matches();
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("filter returning " + result + " for rowKey: " + rowKey);
+      }
+      return result;
     }
     return false;
   }
@@ -156,14 +172,27 @@ public class RegExpRowFilter implements RowFilterInterface {
     if (filtersByColumnValue()) {
       byte[] filterValue = equalsMap.get(colKey);
       if (null != filterValue) {
-        return !Arrays.equals(filterValue, data);
+        boolean result = !Arrays.equals(filterValue, data);
+        if (LOG.isDebugEnabled()) {
+          LOG.debug("filter returning " + result + " for rowKey: " + rowKey + 
+            " colKey: " + colKey);
+        }
+        return result;
       }
     }
     if (nullColumns.contains(colKey)) {
       if (data != null && !Arrays.equals(HConstants.DELETE_BYTES.get(), data)) {
+        if (LOG.isDebugEnabled()) {
+          LOG.debug("filter returning true for rowKey: " + rowKey + 
+            " colKey: " + colKey);
+        }
         return true;
       }
     }
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("filter returning false for rowKey: " + rowKey + " colKey: " + 
+        colKey);
+    }
     return false;
   }
 
@@ -175,14 +204,25 @@ public class RegExpRowFilter implements RowFilterInterface {
     for (Entry<Text, byte[]> col : columns.entrySet()) {
       if (nullColumns.contains(col.getKey())
           && !Arrays.equals(HConstants.DELETE_BYTES.get(), col.getValue())) {
+        if (LOG.isDebugEnabled()) {
+          LOG.debug("filterNotNull returning true for colKey: " + col.getKey()
+            + ", column should be null.");
+        }
         return true;
       }
     }
     for (Text col : equalsMap.keySet()) {
       if (!columns.containsKey(col)) {
+        if (LOG.isDebugEnabled()) {
+          LOG.debug("filterNotNull returning true for colKey: " + col + 
+            ", column not found in given TreeMap<Text, byte[]>.");
+        }
         return true;
       }
     }
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("filterNotNull returning false.");
+    }
     return false;
   }
 
@@ -215,7 +255,7 @@ public class RegExpRowFilter implements RowFilterInterface {
   public void readFields(final DataInput in) throws IOException {
     boolean hasRowKeyPattern = in.readBoolean();
     if (hasRowKeyPattern) {
-      rowKeyRegExp = in.readLine();
+      rowKeyRegExp = in.readUTF();
     }
     // equals map
     equalsMap.clear();
@@ -283,7 +323,7 @@ public class RegExpRowFilter implements RowFilterInterface {
       out.writeBoolean(false);
     } else {
       out.writeBoolean(true);
-      out.writeChars(getRowKeyRegExp());
+      out.writeUTF(getRowKeyRegExp());
     }
 
     // equalsMap

+ 28 - 14
src/contrib/hbase/src/java/org/apache/hadoop/hbase/filter/RowFilterInterface.java

@@ -21,7 +21,6 @@ package org.apache.hadoop.hbase.filter;
 
 import java.util.TreeMap;
 
-import org.apache.hadoop.hbase.HRegion;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.io.Writable;
 
@@ -39,17 +38,30 @@ public interface RowFilterInterface extends Writable {
   void reset();
 
   /**
-   * Called to let filter know that the specified row has been included in the
-   * results (passed all filtering). With out HScanner calling this, the filter
-   * does not know if a row passed filtering even if it passed the row itself
-   * because other filters may have failed the row. E.g. when this filter is a
-   * member of a RowFilterSet with an OR operator.
+   * Called to let filter know the final decision (to pass or filter) on a 
+   * given row.  With out HScanner calling this, the filter does not know if a 
+   * row passed filtering even if it passed the row itself because other 
+   * filters may have failed the row. E.g. when this filter is a member of a 
+   * RowFilterSet with an OR operator.
    * 
    * @see RowFilterSet
    * @param key
    */
-  void acceptedRow(final Text key);
+  void rowProcessed(boolean filtered, Text key);
 
+  /**
+   * Returns whether or not the filter should always be processed in any 
+   * filtering call.  This precaution is necessary for filters that maintain 
+   * state and need to be updated according to their response to filtering 
+   * calls (see WhileMatchRowFilter for an example).  At times, filters nested 
+   * in RowFilterSets may or may not be called because the RowFilterSet 
+   * determines a result as fast as possible.  Returning true for 
+   * processAlways() ensures that the filter will always be called.
+   * 
+   * @return whether or not to always process the filter
+   */
+  boolean processAlways();
+  
   /**
    * Determines if the filter has decided that all remaining results should be
    * filtered (skipped). This is used to prevent the scanner from scanning a
@@ -82,13 +94,15 @@ public interface RowFilterInterface extends Writable {
   boolean filter(final Text rowKey, final Text colKey, final byte[] data);
 
   /**
-   * Filters row if given columns are non-null and have null criteria or if
-   * there exists criteria on columns not included in the column set. A column
-   * is considered null if it:
-   * <ul>
-   * <li>Is not included in the given columns.</li>
-   * <li>Has a value of HConstants.DELETE_BYTES</li>
-   * </ul>
+   * Filters a row if:
+   * 1) The given row (@param columns) has a columnKey expected to be null AND 
+   * the value associated with that columnKey is non-null.
+   * 2) The filter has a criterion for a particular columnKey, but that 
+   * columnKey is not in the given row (@param columns).
+   * 
+   * Note that filterNotNull does not care whether the values associated with a 
+   * columnKey match.  Also note that a "null value" associated with a columnKey 
+   * is expressed as HConstants.DELETE_BYTES.
    * 
    * @param columns
    * @return true if null/non-null criteria not met.

+ 145 - 40
src/contrib/hbase/src/java/org/apache/hadoop/hbase/filter/RowFilterSet.java

@@ -26,23 +26,27 @@ import java.util.HashSet;
 import java.util.Set;
 import java.util.TreeMap;
 
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.io.Text;
 
 /**
  * Implementation of RowFilterInterface that represents a set of RowFilters
- * which will be evaluated with a specified boolean operator AND/OR. Since you
- * can use RowFilterSets as children of RowFilterSet, you can create a
- * hierarchy of filters to be evaluated.
+ * which will be evaluated with a specified boolean operator MUST_PASS_ALL 
+ * (!AND) or MUST_PASS_ONE (!OR).  Since you can use RowFilterSets as children 
+ * of RowFilterSet, you can create a hierarchy of filters to be evaluated.
  */
 public class RowFilterSet implements RowFilterInterface {
 
-  enum Operator {
-    AND, OR
+  public static enum Operator {
+    MUST_PASS_ALL, MUST_PASS_ONE
   }
 
-  private Operator operator = Operator.AND;
+  private Operator operator = Operator.MUST_PASS_ALL;
   private Set<RowFilterInterface> filters = new HashSet<RowFilterInterface>();
 
+  static final Log LOG = LogFactory.getLog(RowFilterSet.class);
+  
   /**
    * Default constructor, filters nothing. Required though for RPC
    * deserialization.
@@ -52,8 +56,8 @@ public class RowFilterSet implements RowFilterInterface {
   }
 
   /**
-   * Constructor that takes a set of RowFilters. The default operator AND is
-   * assumed.
+   * Constructor that takes a set of RowFilters. The default operator 
+   * MUST_PASS_ALL is assumed.
    * 
    * @param rowFilters
    */
@@ -80,6 +84,10 @@ public class RowFilterSet implements RowFilterInterface {
   public void validate(final Text[] columns) {
     for (RowFilterInterface filter : filters) {
       filter.validate(columns);
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("Validated subfilter of type " + 
+          filter.getClass().getSimpleName());
+      }
     }
   }
 
@@ -90,6 +98,10 @@ public class RowFilterSet implements RowFilterInterface {
   public void reset() {
     for (RowFilterInterface filter : filters) {
       filter.reset();
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("Reset subfilter of type " + 
+          filter.getClass().getSimpleName());
+      }
     }
   }
 
@@ -97,29 +109,61 @@ public class RowFilterSet implements RowFilterInterface {
    * 
    * {@inheritDoc}
    */
-  public void acceptedRow(final Text key) {
+  public void rowProcessed(boolean filtered, Text rowKey) {
     for (RowFilterInterface filter : filters) {
-      filter.acceptedRow(key);
+      filter.rowProcessed(filtered, rowKey);
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("Called rowProcessed on subfilter of type " + 
+          filter.getClass().getSimpleName());
+      }
     }
   }
 
+  /**
+   * 
+   * {@inheritDoc}
+   */
+  public boolean processAlways() {
+    for (RowFilterInterface filter : filters) {
+      if (filter.processAlways()) {
+        if (LOG.isDebugEnabled()) {
+          LOG.debug("processAlways() is true due to subfilter of type " + 
+            filter.getClass().getSimpleName());
+        }
+        return true;
+      }
+    }
+    return false;
+  }
+  
   /**
    * 
    * {@inheritDoc}
    */
   public boolean filterAllRemaining() {
-    boolean result = operator == Operator.OR;
+    boolean result = operator == Operator.MUST_PASS_ONE;
     for (RowFilterInterface filter : filters) {
-      if (operator == Operator.AND) {
+      if (operator == Operator.MUST_PASS_ALL) {
         if (filter.filterAllRemaining()) {
+          if (LOG.isDebugEnabled()) {
+            LOG.debug("op.MPALL filterAllRemaining returning true due" + 
+              " to subfilter of type " + filter.getClass().getSimpleName());
+          }
           return true;
         }
-      } else if (operator == Operator.OR) {
+      } else if (operator == Operator.MUST_PASS_ONE) {
         if (!filter.filterAllRemaining()) {
+          if (LOG.isDebugEnabled()) {
+            LOG.debug("op.MPONE filterAllRemaining returning false due" + 
+              " to subfilter of type " + filter.getClass().getSimpleName());
+          }
           return false;
         }
       }
     }
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("filterAllRemaining default returning " + result);
+    }
     return result;
   }
 
@@ -128,40 +172,79 @@ public class RowFilterSet implements RowFilterInterface {
    * {@inheritDoc}
    */
   public boolean filter(final Text rowKey) {
-    boolean result = operator == Operator.OR;
+    boolean resultFound = false;
+    boolean result = operator == Operator.MUST_PASS_ONE;
     for (RowFilterInterface filter : filters) {
-      if (operator == Operator.AND) {
-        if (filter.filterAllRemaining() || filter.filter(rowKey)) {
-          return true;
-        }
-      } else if (operator == Operator.OR) {
-        if (!filter.filterAllRemaining() && !filter.filter(rowKey)) {
-          return false;
+      if (!resultFound) {
+        if (operator == Operator.MUST_PASS_ALL) {
+          if (filter.filterAllRemaining() || filter.filter(rowKey)) {
+            if (LOG.isDebugEnabled()) {
+              LOG.debug("op.MPALL filter(Text) will return true due" + 
+                " to subfilter of type " + filter.getClass().getSimpleName());
+            }
+            result = true;
+            resultFound = true;
+          }
+        } else if (operator == Operator.MUST_PASS_ONE) {
+          if (!filter.filterAllRemaining() && !filter.filter(rowKey)) {
+            if (LOG.isDebugEnabled()) {
+              LOG.debug("op.MPONE filter(Text) will return false due" + 
+                " to subfilter of type " + filter.getClass().getSimpleName());
+            }
+            result = false;
+            resultFound = true;
+          }
         }
+      } else if (filter.processAlways()) {
+        filter.filter(rowKey);
       }
     }
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("filter(Text) returning " + result);
+    }
     return result;
-
   }
 
   /**
    * 
    * {@inheritDoc}
    */
-  public boolean filter(final Text rowKey, final Text colKey, final byte[] data) {
-    boolean result = operator == Operator.OR;
+  public boolean filter(final Text rowKey, final Text colKey, 
+    final byte[] data) {
+    boolean resultFound = false;
+    boolean result = operator == Operator.MUST_PASS_ONE;
     for (RowFilterInterface filter : filters) {
-      if (operator == Operator.AND) {
-        if (filter.filterAllRemaining() || filter.filter(rowKey, colKey, data)) {
-          return true;
-        }
-      } else if (operator == Operator.OR) {
-        if (!filter.filterAllRemaining()
-            && !filter.filter(rowKey, colKey, data)) {
-          return false;
+      if (!resultFound) {
+        if (operator == Operator.MUST_PASS_ALL) {
+          if (filter.filterAllRemaining() || 
+            filter.filter(rowKey, colKey, data)) {
+            if (LOG.isDebugEnabled()) {
+              LOG.debug("op.MPALL filter(Text, Text, byte[]) will" + 
+                " return true due to subfilter of type " + 
+                filter.getClass().getSimpleName());
+            }
+            result = true;
+            resultFound = true;
+          }
+        } else if (operator == Operator.MUST_PASS_ONE) {
+          if (!filter.filterAllRemaining() && 
+            !filter.filter(rowKey, colKey, data)) {
+            if (LOG.isDebugEnabled()) {
+              LOG.debug("op.MPONE filter(Text, Text, byte[]) will" + 
+                " return false due to subfilter of type " + 
+                filter.getClass().getSimpleName());
+            }
+            result = false;
+            resultFound = true;
+          }
         }
+      } else if (filter.processAlways()) {
+        filter.filter(rowKey, colKey, data);
       }
     }
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("filter(Text, Text, byte[]) returning " + result);
+    }
     return result;
   }
 
@@ -170,18 +253,36 @@ public class RowFilterSet implements RowFilterInterface {
    * {@inheritDoc}
    */
   public boolean filterNotNull(final TreeMap<Text, byte[]> columns) {
-    boolean result = operator == Operator.OR;
+    boolean resultFound = false;
+    boolean result = operator == Operator.MUST_PASS_ONE;
     for (RowFilterInterface filter : filters) {
-      if (operator == Operator.AND) {
-        if (filter.filterAllRemaining() || filter.filterNotNull(columns)) {
-          return true;
-        }
-      } else if (operator == Operator.OR) {
-        if (!filter.filterAllRemaining() && !filter.filterNotNull(columns)) {
-          return false;
+      if (!resultFound) {
+        if (operator == Operator.MUST_PASS_ALL) {
+          if (filter.filterAllRemaining() || filter.filterNotNull(columns)) {
+            if (LOG.isDebugEnabled()) {
+              LOG.debug("op.MPALL filterNotNull will return true due" + 
+                " to subfilter of type " + filter.getClass().getSimpleName());
+            }
+            result = true;
+            resultFound = true;
+          }
+        } else if (operator == Operator.MUST_PASS_ONE) {
+          if (!filter.filterAllRemaining() && !filter.filterNotNull(columns)) {
+            if (LOG.isDebugEnabled()) {
+              LOG.debug("op.MPONE filterNotNull will return false due" + 
+                " to subfilter of type " + filter.getClass().getSimpleName());
+            }
+            result = false;
+            resultFound = true;
+          }
         }
+      } else if (filter.processAlways()) {
+        filter.filterNotNull(columns);
       }
     }
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("filterNotNull returning " + result);
+    }
     return result;
   }
 
@@ -203,6 +304,10 @@ public class RowFilterSet implements RowFilterInterface {
           filter = (RowFilterInterface) clazz.newInstance();
           filter.readFields(in);
           filters.add(filter);
+          if (LOG.isDebugEnabled()) {
+            LOG.debug("Successfully read in subfilter of type " + 
+              filter.getClass().getSimpleName());
+          }
         }
       } catch (InstantiationException e) {
         throw new RuntimeException("Failed to deserialize RowFilterInterface.",

+ 39 - 6
src/contrib/hbase/src/test/org/apache/hadoop/hbase/filter/TestPageRowFilter.java

@@ -19,34 +19,67 @@
  */
 package org.apache.hadoop.hbase.filter;
 
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+
 import org.apache.hadoop.io.Text;
 
 import junit.framework.TestCase;
 
 public class TestPageRowFilter extends TestCase {
+  
+  RowFilterInterface mainFilter;
+  final int ROW_LIMIT = 3;
+  
+  protected void setUp() throws Exception {
+    super.setUp();
+    mainFilter = new PageRowFilter(ROW_LIMIT);
+  }
+  
   public void testPageSize() throws Exception {
-    final int pageSize = 3;
-    RowFilterInterface filter = new PageRowFilter(pageSize);
-    testFiltersBeyondPageSize(filter, pageSize);
+    pageSizeTests(mainFilter);
+  }
+  
+  public void testSerialization() throws Exception {
+    // Decompose mainFilter to bytes.
+    ByteArrayOutputStream stream = new ByteArrayOutputStream();
+    DataOutputStream out = new DataOutputStream(stream);
+    mainFilter.write(out);
+    out.close();
+    byte[] buffer = stream.toByteArray();
+    
+    // Recompose mainFilter.
+    DataInputStream in = new DataInputStream(new ByteArrayInputStream(buffer));
+    RowFilterInterface newFilter = new PageRowFilter();
+    newFilter.readFields(in);
+    
+    // Ensure the serialization preserved the filter by running a full test.
+    pageSizeTests(newFilter);
+  }
+  
+  private void pageSizeTests(RowFilterInterface filter) throws Exception {
+    testFiltersBeyondPageSize(filter, ROW_LIMIT);
     // Test reset works by going in again.
     filter.reset();
-    testFiltersBeyondPageSize(filter, pageSize);
+    testFiltersBeyondPageSize(filter, ROW_LIMIT);
   }
   
   private void testFiltersBeyondPageSize(final RowFilterInterface filter,
-      final int pageSize) {
+    final int pageSize) {
     for (int i = 0; i < (pageSize * 2); i++) {
       Text row = new Text(Integer.toString(i));
       boolean filterOut = filter.filter(row);
       if (!filterOut) {
         assertFalse("Disagrees with 'filter'", filter.filterAllRemaining());
-        filter.acceptedRow(row);
       } else {
         // Once we have all for a page, calls to filterAllRemaining should
         // stay true.
         assertTrue("Disagrees with 'filter'", filter.filterAllRemaining());
         assertTrue(i >= pageSize);
       }
+      filter.rowProcessed(filterOut, row);
     }
   }
 }

+ 86 - 5
src/contrib/hbase/src/test/org/apache/hadoop/hbase/filter/TestRegExpRowFilter.java

@@ -19,16 +19,21 @@
  */
 package org.apache.hadoop.hbase.filter;
 
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
 import java.util.Map;
 import java.util.TreeMap;
 
 import junit.framework.TestCase;
 
+import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.io.Text;
 
 public class TestRegExpRowFilter extends TestCase {
   TreeMap<Text, byte []> colvalues;
-  RowFilterInterface filter;
+  RowFilterInterface mainFilter;
   final char FIRST_CHAR = 'a';
   final char LAST_CHAR = 'e';
   byte [] GOOD_BYTES = "abc".getBytes();
@@ -41,10 +46,43 @@ public class TestRegExpRowFilter extends TestCase {
     for (char c = FIRST_CHAR; c < LAST_CHAR; c++) {
       colvalues.put(new Text(new String(new char [] {c})), GOOD_BYTES);
     }
-    this.filter = new RegExpRowFilter(HOST_PREFIX + ".*", colvalues);
+    this.mainFilter = new RegExpRowFilter(HOST_PREFIX + ".*", colvalues);
   }
   
   public void testRegexOnRow() throws Exception {
+    regexRowTests(mainFilter);
+  }
+
+  public void testRegexOnRowAndColumn() throws Exception {
+    regexRowColumnTests(mainFilter);
+  }
+  
+  public void testFilterNotNull() throws Exception {
+    filterNotNullTests(mainFilter);
+  }
+  
+  public void testSerialization() throws Exception {
+    // Decompose mainFilter to bytes.
+    ByteArrayOutputStream stream = new ByteArrayOutputStream();
+    DataOutputStream out = new DataOutputStream(stream);
+    mainFilter.write(out);
+    out.close();
+    byte[] buffer = stream.toByteArray();
+    
+    // Recompose filter.
+    DataInputStream in = new DataInputStream(new ByteArrayInputStream(buffer));
+    RowFilterInterface newFilter = new RegExpRowFilter();
+    newFilter.readFields(in);
+    
+    // Ensure the serialization preserved the filter by running all test.
+    regexRowTests(newFilter);
+    newFilter.reset();
+    regexRowColumnTests(newFilter);
+    newFilter.reset();
+    filterNotNullTests(newFilter);
+  }
+ 
+  private void regexRowTests(RowFilterInterface filter) throws Exception {
     for (char c = FIRST_CHAR; c <= LAST_CHAR; c++) {
       Text t = createRow(c);
       assertFalse("Failed with characer " + c, filter.filter(t));
@@ -54,12 +92,12 @@ public class TestRegExpRowFilter extends TestCase {
       yahooSite, filter.filter(new Text(yahooSite)));
   }
   
-  public void testRegexOnRowAndColumn() throws Exception {
+  private void regexRowColumnTests(RowFilterInterface filter) {
     for (char c = FIRST_CHAR; c <= LAST_CHAR; c++) {
       Text t = createRow(c);
       for (Map.Entry<Text, byte []> e: this.colvalues.entrySet()) {
         assertFalse("Failed on " + c,
-          this.filter.filter(t, e.getKey(), e.getValue()));
+          filter.filter(t, e.getKey(), e.getValue()));
       }
     }
     // Try a row and column I know will pass.
@@ -68,17 +106,60 @@ public class TestRegExpRowFilter extends TestCase {
     Text col = new Text(Character.toString(c));
     assertFalse("Failed with character " + c,
       filter.filter(r, col, GOOD_BYTES));
+    
     // Do same but with bad bytes.
     assertTrue("Failed with character " + c,
       filter.filter(r, col, "badbytes".getBytes()));
+    
     // Do with good bytes but bad column name.  Should not filter out.
     assertFalse("Failed with character " + c,
       filter.filter(r, new Text("badcolumn"), GOOD_BYTES));
+    
     // Good column, good bytes but bad row.
     assertTrue("Failed with character " + c,
       filter.filter(new Text("bad row"), new Text("badcolumn"), GOOD_BYTES));
   }
-  
+ 
+  private void filterNotNullTests(RowFilterInterface filter) throws Exception {
+    // Modify the filter to expect certain columns to be null:
+    // Expecting a row WITH columnKeys: a-d, WITHOUT columnKey: e
+    ((RegExpRowFilter)filter).setColumnFilter(new Text(new String(new char[] { 
+      LAST_CHAR })), null);
+    
+    char secondToLast = (char)(LAST_CHAR - 1);
+    char thirdToLast = (char)(LAST_CHAR - 2);
+    
+    // Modify the row to be missing an expected columnKey (d)
+    colvalues.remove(new Text(new String(new char[] { secondToLast })));
+
+    // Try a row that is missing an expected columnKey.
+    // Testing row with columnKeys: a-c
+    assertTrue("Failed with last columnKey " + thirdToLast, filter.
+      filterNotNull(colvalues));
+
+    // Try a row that has all expected columnKeys, and NO null-expected
+    // columnKeys.
+    // Testing row with columnKeys: a-d
+    colvalues.put(new Text(new String(new char[] { secondToLast })),
+      GOOD_BYTES);
+    assertFalse("Failed with last columnKey " + secondToLast, filter.
+      filterNotNull(colvalues));
+
+    // Try a row that has all expected columnKeys AND a null-expected columnKey.
+    // Testing row with columnKeys: a-e
+    colvalues.put(new Text(new String(new char[] { LAST_CHAR })), GOOD_BYTES);
+    assertTrue("Failed with last columnKey " + LAST_CHAR, filter.
+      filterNotNull(colvalues));
+    
+    // Try a row that has all expected columnKeys and a null-expected columnKey 
+    // that maps to a null value.
+    // Testing row with columnKeys: a-e, e maps to null
+    colvalues.put(new Text(new String(new char[] { LAST_CHAR })), 
+      HConstants.DELETE_BYTES.get());
+    assertFalse("Failed with last columnKey " + LAST_CHAR + " mapping to null.", 
+      filter.filterNotNull(colvalues));
+  }
+
   private Text createRow(final char c) {
     return new Text(HOST_PREFIX + Character.toString(c));
   }