Browse Source

HADOOP-2467 scanner truncates resultset when > 1 column families

git-svn-id: https://svn.apache.org/repos/asf/lucene/hadoop/trunk@605811 13f79535-47bb-0310-9956-ffa450edef68
Michael Stack 17 năm trước cách đây
mục cha
commit
78fa1fe80b

+ 1 - 0
src/contrib/hbase/CHANGES.txt

@@ -89,6 +89,7 @@ Trunk (unreleased changes)
    HADOOP-2465 When split parent regions are cleaned up, not all the columns are
                deleted
    HADOOP-2468 TestRegionServerExit failed in Hadoop-Nightly #338
+   HADOOP-2467 scanner truncates resultset when > 1 column families
    
   IMPROVEMENTS
    HADOOP-2401 Add convenience put method that takes writable

+ 1 - 1
src/contrib/hbase/src/java/org/apache/hadoop/hbase/HMaster.java

@@ -592,7 +592,7 @@ public class HMaster extends Thread implements HConstants, HMasterInterface,
     /** {@inheritDoc} */
     @Override
     public String toString() {
-      return "regionname: " + this.regionName.toString() + ", startKey: <" +
+      return "{regionname: " + this.regionName.toString() + ", startKey: <" +
         this.startKey.toString() + ">, server: " + this.server.toString() + "}";
     }
 

+ 16 - 11
src/contrib/hbase/src/java/org/apache/hadoop/hbase/HRegion.java

@@ -1113,6 +1113,7 @@ public class HRegion implements HConstants {
           continue;
         }
         storelist.add(stores.get(family));
+        
       }
       return new HScanner(cols, firstRow, timestamp,
         storelist.toArray(new HStore [storelist.size()]), filter);
@@ -1296,7 +1297,6 @@ public class HRegion implements HConstants {
     
     try {
       // find the HStore for the column family
-      LOG.info(family);
       HStore store = stores.get(HStoreKey.extractFamily(family));
       // find all the keys that match our criteria
       List<HStoreKey> keys = store.getKeys(new HStoreKey(row, timestamp), ALL_VERSIONS);
@@ -1422,8 +1422,8 @@ public class HRegion implements HConstants {
    * @throws IOException
    */
   private void checkColumn(Text columnName) throws IOException {
-    Text family = new Text(HStoreKey.extractFamily(columnName) + ":");
-    if(! regionInfo.getTableDesc().hasFamily(family)) {
+    Text family = HStoreKey.extractFamily(columnName, true);
+    if (!regionInfo.getTableDesc().hasFamily(family)) {
       throw new IOException("Requested column family " + family 
           + " does not exist in HRegion " + regionInfo.getRegionName()
           + " for table " + regionInfo.getTableDesc().getName());
@@ -1529,14 +1529,21 @@ public class HRegion implements HConstants {
     /** Create an HScanner with a handle on many HStores. */
     @SuppressWarnings("unchecked")
     HScanner(Text[] cols, Text firstRow, long timestamp, HStore[] stores,
-        RowFilterInterface filter) throws IOException {
-
+        RowFilterInterface filter)
+    throws IOException {
       this.scanners = new HInternalScannerInterface[stores.length];
       try {
         for (int i = 0; i < stores.length; i++) {
-          scanners[i] = stores[i].getScanner(timestamp, cols, firstRow, filter);
+          // TODO: The cols passed in here can include columns from other
+          // stores; add filter so only pertinent columns are passed.
+          //
+          // Also, if more than one store involved, need to replicate filters.
+          // At least WhileMatchRowFilter will mess up the scan if only
+          // one shared across many rows. See HADOOP-2467.
+          scanners[i] = stores[i].getScanner(timestamp, cols, firstRow,
+            (i > 0 && filter != null)?
+              (RowFilterInterface)Writables.clone(filter, conf): filter);
         }
-
       } catch(IOException e) {
         for (int i = 0; i < this.scanners.length; i++) {
           if(scanners[i] != null) {
@@ -1546,9 +1553,8 @@ public class HRegion implements HConstants {
         throw e;
       }
 
-//       Advance to the first key in each store.
-//       All results will match the required column-set and scanTime.
-      
+      // Advance to the first key in each store.
+      // All results will match the required column-set and scanTime.
       this.resultSets = new TreeMap[scanners.length];
       this.keys = new HStoreKey[scanners.length];
       for (int i = 0; i < scanners.length; i++) {
@@ -1616,7 +1622,6 @@ public class HRegion implements HConstants {
         // row label, then its timestamp is bad. We need to advance it.
         while ((scanners[i] != null) &&
             (keys[i].getRow().compareTo(chosenRow) <= 0)) {
-          
           resultSets[i].clear();
           if (!scanners[i].next(keys[i], resultSets[i])) {
             closeScanner(i);

+ 0 - 1
src/contrib/hbase/src/java/org/apache/hadoop/hbase/HStore.java

@@ -1848,7 +1848,6 @@ class HStore implements HConstants {
         this.readers = new MapFile.Reader[storefiles.size()];
         
         // Most recent map file should be first
-        
         int i = readers.length - 1;
         for(HStoreFile curHSF: storefiles.values()) {
           readers[i--] = curHSF.getReader(fs, bloomFilter);

+ 9 - 0
src/contrib/hbase/src/java/org/apache/hadoop/hbase/HStoreFile.java

@@ -815,6 +815,15 @@ public class HStoreFile implements HConstants, WritableComparable {
       throws IOException {
         super(fs, dirName, conf);
         this.bloomFilter = filter;
+        // Force reading of the mapfile index by calling midKey.
+        // Reading the index will bring the index into memory over
+        // here on the client and then close the index file freeing
+        // up socket connection and resources in the datanode. 
+        // Usually, the first access on a MapFile.Reader will load the
+        // index force the issue in HStoreFile MapFiles because an
+        // access may not happen for some time; meantime we're
+        // using up datanode resources.  See HADOOP-2341.
+        midKey();
       }
       
       /** {@inheritDoc} */

+ 24 - 15
src/contrib/hbase/src/java/org/apache/hadoop/hbase/HStoreKey.java

@@ -27,6 +27,8 @@ import java.io.*;
  * A Key for a stored row
  */
 public class HStoreKey implements WritableComparable {
+  public static final char COLUMN_FAMILY_DELIMITER = ':';
+  
   // TODO: Move these utility methods elsewhere (To a Column class?).
   /**
    * Extracts the column family name from a column
@@ -83,7 +85,13 @@ public class HStoreKey implements WritableComparable {
   
   private static int getColonOffset(final Text col)
   throws InvalidColumnNameException {
-    int offset = col.find(":");
+    int offset = -1;
+    for (int i = 0; i < col.getLength(); i++) {
+      if (col.charAt(i) == COLUMN_FAMILY_DELIMITER) {
+        offset = i;
+        break;
+      }
+    }
     if(offset < 0) {
       throw new InvalidColumnNameException(col + " is missing the colon " +
         "family/qualifier separator");
@@ -294,23 +302,24 @@ public class HStoreKey implements WritableComparable {
 
   // Comparable
 
-  /** {@inheritDoc} */
   public int compareTo(Object o) {
     HStoreKey other = (HStoreKey) o;
     int result = this.row.compareTo(other.row);
-    if(result == 0) {
-      result = this.column.compareTo(other.column);
-      if(result == 0) {
-        // The below older timestamps sorting ahead of newer timestamps looks
-        // wrong but it is intentional.  This way, newer timestamps are first
-        // found when we iterate over a memcache and newer versions are the
-        // first we trip over when reading from a store file.
-        if(this.timestamp < other.timestamp) {
-          result = 1;
-        } else if(this.timestamp > other.timestamp) {
-          result = -1;
-        }
-      }
+    if (result != 0) {
+      return result;
+    }
+    result = this.column.compareTo(other.column);
+    if (result != 0) {
+      return result;
+    }
+    // The below older timestamps sorting ahead of newer timestamps looks
+    // wrong but it is intentional. This way, newer timestamps are first
+    // found when we iterate over a memcache and newer versions are the
+    // first we trip over when reading from a store file.
+    if (this.timestamp < other.timestamp) {
+      result = 1;
+    } else if (this.timestamp > other.timestamp) {
+      result = -1;
     }
     return result;
   }

+ 3 - 3
src/contrib/hbase/src/java/org/apache/hadoop/hbase/HTable.java

@@ -426,7 +426,7 @@ public class HTable implements HConstants {
    */
   public HScannerInterface obtainScanner(Text[] columns, Text startRow)
   throws IOException {
-    return obtainScanner(columns, startRow, System.currentTimeMillis(), null);
+    return obtainScanner(columns, startRow, HConstants.LATEST_TIMESTAMP, null);
   }
   
   /** 
@@ -466,7 +466,7 @@ public class HTable implements HConstants {
   public HScannerInterface obtainScanner(Text[] columns, Text startRow,
       RowFilterInterface filter)
   throws IOException { 
-    return obtainScanner(columns, startRow, System.currentTimeMillis(), filter);
+    return obtainScanner(columns, startRow, HConstants.LATEST_TIMESTAMP, filter);
   }
 
   /** 
@@ -490,7 +490,7 @@ public class HTable implements HConstants {
       final Text startRow, final Text stopRow)
   throws IOException {
     return obtainScanner(columns, startRow, stopRow,
-      System.currentTimeMillis());
+      HConstants.LATEST_TIMESTAMP);
   }
 
   /** 

+ 22 - 1
src/contrib/hbase/src/java/org/apache/hadoop/hbase/util/Writables.java

@@ -27,7 +27,10 @@ import java.io.UnsupportedEncodingException;
 import org.apache.hadoop.io.DataInputBuffer;
 import org.apache.hadoop.io.LongWritable;
 import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableUtils;
+import org.apache.hadoop.util.ReflectionUtils;
 
+import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.HRegionInfo;
 
@@ -90,7 +93,25 @@ public class Writables {
       in.close();
     }
   }
-  
+
+  /**
+   * Make a copy of a writable object using serialization to a buffer.
+   * Copied from WritableUtils only <code>conf</code> type is Configurable
+   * rather than JobConf (Doesn't need to be JobConf -- HADOOP-2469).
+   * @param orig The object to copy
+   * @return The copied object
+   */
+  public static Writable clone(Writable orig, Configuration conf) {
+    try {
+      Writable newInst =
+        (Writable)ReflectionUtils.newInstance(orig.getClass(), conf);
+      WritableUtils.cloneInto(newInst, orig);
+      return newInst;
+    } catch (IOException e) {
+      throw new RuntimeException("Error writing/reading clone buffer", e);
+    }
+  }
+
   /**
    * @param bytes
    * @return A HRegionInfo instance built out of passed <code>bytes</code>.

+ 14 - 2
src/contrib/hbase/src/test/org/apache/hadoop/hbase/HBaseTestCase.java

@@ -77,8 +77,7 @@ public abstract class HBaseTestCase extends TestCase {
   private void init() {
     conf = new HBaseConfiguration();
     try {
-      START_KEY =
-        new String(START_KEY_BYTES, HConstants.UTF8_ENCODING) + PUNCTUATION;
+      START_KEY = new String(START_KEY_BYTES, HConstants.UTF8_ENCODING) + PUNCTUATION;
     } catch (UnsupportedEncodingException e) {
       fail();
     }
@@ -125,10 +124,23 @@ public abstract class HBaseTestCase extends TestCase {
           null), fs, conf, info, null, null);
   }
   
+  /**
+   * Create a table of name <code>name</code> with {@link COLUMNS} for
+   * families.
+   * @param name Name to give table.
+   * @return Column descriptor.
+   */
   protected HTableDescriptor createTableDescriptor(final String name) {
     return createTableDescriptor(name, MAXVERSIONS);
   }
   
+  /**
+   * Create a table of name <code>name</code> with {@link COLUMNS} for
+   * families.
+   * @param name Name to give table.
+   * @param versions How many versions to allow per column.
+   * @return Column descriptor.
+   */
   protected HTableDescriptor createTableDescriptor(final String name,
       final int versions) {
     HTableDescriptor htd = new HTableDescriptor(name);

+ 48 - 0
src/contrib/hbase/src/test/org/apache/hadoop/hbase/TestScanner2.java

@@ -71,6 +71,54 @@ public class TestScanner2 extends HBaseClusterTestCase {
     }
   }
 
+  /**
+   * Test for HADOOP-2467 fix.  If scanning more than one column family,
+   * filters such as the {@line WhileMatchRowFilter} could prematurely
+   * shutdown scanning if one of the stores ran started returned filter = true.
+   * @throws MasterNotRunningException
+   * @throws IOException
+   */
+  public void testScanningMultipleFamiliesOfDifferentVintage()
+  throws MasterNotRunningException, IOException {
+    Text tableName = new Text(getName());
+    final Text [] families = createTable(new HBaseAdmin(this.conf), tableName);
+    HTable table = new HTable(this.conf, tableName);
+    HScannerInterface scanner = null;
+    try {
+      long time = System.currentTimeMillis();
+      LOG.info("Current time " + time);
+      for (int i = 0; i < families.length; i++) {
+        final byte [] lastKey = new byte [] {'a', 'a', (byte)('b' + i)};
+        Incommon inc = new HTableIncommon(table);
+        addContent(inc, families[i].toString(),
+          START_KEY_BYTES, new Text(lastKey), time + (1000 * i));
+        // Add in to the first store a record that is in excess of the stop
+        // row specified below setting up the scanner filter.  Add 'bbb'.
+        // Use a stop filter of 'aad'.  The store scanner going to 'bbb' was
+        // flipping the switch in StopRowFilter stopping us returning all
+        // of the rest of the other store content.
+        if (i == 0) {
+          long id = inc.startBatchUpdate(new Text("bbb"));
+          inc.put(id, families[0], "bbb".getBytes());
+          inc.commit(id);
+        }
+      }
+      RowFilterInterface f =
+        new WhileMatchRowFilter(new StopRowFilter(new Text("aad")));
+      scanner = table.obtainScanner(families, HConstants.EMPTY_START_ROW,
+        HConstants.LATEST_TIMESTAMP, f);
+      int count = 0;
+      for (Map.Entry<HStoreKey, SortedMap<Text, byte []>> e: scanner) {
+        count++;
+      }
+      // Should get back 3 rows: aaa, aab, and aac.
+      assertEquals(count, 3);
+    } finally {
+      scanner.close();
+      table.close();
+    }
+  }
+
   /**
    * @throws Exception
    */