|
@@ -15,14 +15,26 @@
|
|
|
*/
|
|
|
package org.apache.hadoop.hbase;
|
|
|
|
|
|
+import java.io.ByteArrayOutputStream;
|
|
|
+import java.io.DataOutputStream;
|
|
|
+import java.io.IOException;
|
|
|
+import java.io.UnsupportedEncodingException;
|
|
|
+import java.util.HashMap;
|
|
|
+import java.util.Iterator;
|
|
|
+import java.util.Map;
|
|
|
+import java.util.Random;
|
|
|
+import java.util.TreeMap;
|
|
|
+import java.util.TreeSet;
|
|
|
+import java.util.Vector;
|
|
|
+
|
|
|
import org.apache.commons.logging.Log;
|
|
|
import org.apache.commons.logging.LogFactory;
|
|
|
-import org.apache.hadoop.io.*;
|
|
|
-import org.apache.hadoop.fs.*;
|
|
|
-import org.apache.hadoop.conf.*;
|
|
|
-
|
|
|
-import java.io.*;
|
|
|
-import java.util.*;
|
|
|
+import org.apache.hadoop.conf.Configuration;
|
|
|
+import org.apache.hadoop.fs.FileSystem;
|
|
|
+import org.apache.hadoop.fs.Path;
|
|
|
+import org.apache.hadoop.hbase.filter.RowFilterInterface;
|
|
|
+import org.apache.hadoop.io.DataInputBuffer;
|
|
|
+import org.apache.hadoop.io.Text;
|
|
|
|
|
|
/**
|
|
|
* HRegion stores data for a certain region of a table. It stores all columns
|
|
@@ -974,6 +986,15 @@ public class HRegion implements HConstants {
|
|
|
*/
|
|
|
public HInternalScannerInterface getScanner(Text[] cols, Text firstRow)
|
|
|
throws IOException {
|
|
|
+ return getScanner(cols, firstRow, null);
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * Return an iterator that scans over the HRegion, returning the indicated
|
|
|
+ * columns for only the rows that match the data filter. This Iterator must be closed by the caller.
|
|
|
+ */
|
|
|
+ public HInternalScannerInterface getScanner(Text[] cols, Text firstRow, RowFilterInterface filter)
|
|
|
+ throws IOException {
|
|
|
lock.obtainReadLock();
|
|
|
try {
|
|
|
TreeSet<Text> families = new TreeSet<Text>();
|
|
@@ -986,7 +1007,7 @@ public class HRegion implements HConstants {
|
|
|
for (Text family: families) {
|
|
|
storelist[i++] = stores.get(family);
|
|
|
}
|
|
|
- return new HScanner(cols, firstRow, memcache, storelist);
|
|
|
+ return new HScanner(cols, firstRow, memcache, storelist, filter);
|
|
|
} finally {
|
|
|
lock.releaseReadLock();
|
|
|
}
|
|
@@ -1262,12 +1283,17 @@ public class HRegion implements HConstants {
|
|
|
private HStoreKey[] keys;
|
|
|
private boolean wildcardMatch;
|
|
|
private boolean multipleMatchers;
|
|
|
+ private RowFilterInterface dataFilter;
|
|
|
|
|
|
/** Create an HScanner with a handle on many HStores. */
|
|
|
@SuppressWarnings("unchecked")
|
|
|
- HScanner(Text[] cols, Text firstRow, HMemcache memcache, HStore[] stores)
|
|
|
+ HScanner(Text[] cols, Text firstRow, HMemcache memcache, HStore[] stores, RowFilterInterface filter)
|
|
|
throws IOException {
|
|
|
long scanTime = System.currentTimeMillis();
|
|
|
+ this.dataFilter = filter;
|
|
|
+ if (null != dataFilter) {
|
|
|
+ dataFilter.reset();
|
|
|
+ }
|
|
|
this.scanners = new HInternalScannerInterface[stores.length + 1];
|
|
|
for(int i = 0; i < this.scanners.length; i++) {
|
|
|
this.scanners[i] = null;
|
|
@@ -1335,84 +1361,135 @@ public class HRegion implements HConstants {
|
|
|
return multipleMatchers;
|
|
|
}
|
|
|
|
|
|
- /* (non-Javadoc)
|
|
|
+ /*
|
|
|
+ * (non-Javadoc)
|
|
|
*
|
|
|
- * Grab the next row's worth of values. The HScanner will return the most
|
|
|
+ * Grab the next row's worth of values. The HScanner will return the most
|
|
|
* recent data value for each row that is not newer than the target time.
|
|
|
- *
|
|
|
- * @see org.apache.hadoop.hbase.HInternalScannerInterface#next(org.apache.hadoop.hbase.HStoreKey, java.util.TreeMap)
|
|
|
+ *
|
|
|
+ * If a dataFilter is defined, it will be used to skip rows that do not
|
|
|
+ * match its criteria. It may cause the scanner to stop prematurely if it
|
|
|
+ * knows that it will no longer accept the remaining results.
|
|
|
+ *
|
|
|
+ * @see org.apache.hadoop.hbase.HInternalScannerInterface#next(org.apache.hadoop.hbase.HStoreKey,
|
|
|
+ * java.util.TreeMap)
|
|
|
*/
|
|
|
- public boolean next(HStoreKey key, TreeMap<Text, byte []> results)
|
|
|
+ public boolean next(HStoreKey key, TreeMap<Text, byte[]> results)
|
|
|
throws IOException {
|
|
|
- // Find the lowest-possible key.
|
|
|
- Text chosenRow = null;
|
|
|
- long chosenTimestamp = -1;
|
|
|
- for(int i = 0; i < this.keys.length; i++) {
|
|
|
- if(scanners[i] != null
|
|
|
- && (chosenRow == null
|
|
|
- || (keys[i].getRow().compareTo(chosenRow) < 0)
|
|
|
- || ((keys[i].getRow().compareTo(chosenRow) == 0)
|
|
|
- && (keys[i].getTimestamp() > chosenTimestamp)))) {
|
|
|
-
|
|
|
- chosenRow = new Text(keys[i].getRow());
|
|
|
- chosenTimestamp = keys[i].getTimestamp();
|
|
|
+ boolean filtered = true;
|
|
|
+ boolean moreToFollow = true;
|
|
|
+ while (filtered && moreToFollow) {
|
|
|
+ // Find the lowest-possible key.
|
|
|
+ Text chosenRow = null;
|
|
|
+ long chosenTimestamp = -1;
|
|
|
+ for (int i = 0; i < this.keys.length; i++) {
|
|
|
+ if (scanners[i] != null &&
|
|
|
+ (chosenRow == null ||
|
|
|
+ (keys[i].getRow().compareTo(chosenRow) < 0) ||
|
|
|
+ ((keys[i].getRow().compareTo(chosenRow) == 0) &&
|
|
|
+ (keys[i].getTimestamp() > chosenTimestamp)))) {
|
|
|
+ chosenRow = new Text(keys[i].getRow());
|
|
|
+ chosenTimestamp = keys[i].getTimestamp();
|
|
|
+ }
|
|
|
}
|
|
|
- }
|
|
|
|
|
|
- // Store the key and results for each sub-scanner. Merge them as appropriate.
|
|
|
- boolean insertedItem = false;
|
|
|
- if(chosenTimestamp > 0) {
|
|
|
- key.setRow(chosenRow);
|
|
|
- key.setVersion(chosenTimestamp);
|
|
|
- key.setColumn(new Text(""));
|
|
|
-
|
|
|
- for(int i = 0; i < scanners.length; i++) {
|
|
|
- while((scanners[i] != null)
|
|
|
- && (keys[i].getRow().compareTo(chosenRow) == 0)) {
|
|
|
- // If we are doing a wild card match or there are multiple matchers
|
|
|
- // per column, we need to scan all the older versions of this row
|
|
|
- // to pick up the rest of the family members
|
|
|
-
|
|
|
- if(!wildcardMatch
|
|
|
- && !multipleMatchers
|
|
|
- && (keys[i].getTimestamp() != chosenTimestamp)) {
|
|
|
- break;
|
|
|
- }
|
|
|
+ // Filter whole row by row key?
|
|
|
+ filtered = dataFilter != null? dataFilter.filter(chosenRow) : false;
|
|
|
+
|
|
|
+ // Store the key and results for each sub-scanner. Merge them as
|
|
|
+ // appropriate.
|
|
|
+ if (chosenTimestamp > 0 && !filtered) {
|
|
|
+ key.setRow(chosenRow);
|
|
|
+ key.setVersion(chosenTimestamp);
|
|
|
+ key.setColumn(new Text(""));
|
|
|
+
|
|
|
+ for (int i = 0; i < scanners.length && !filtered; i++) {
|
|
|
+
|
|
|
+ while ((scanners[i] != null
|
|
|
+ && !filtered
|
|
|
+ && moreToFollow)
|
|
|
+ && (keys[i].getRow().compareTo(chosenRow) == 0)) {
|
|
|
+ // If we are doing a wild card match or there are multiple
|
|
|
+ // matchers
|
|
|
+ // per column, we need to scan all the older versions of this row
|
|
|
+ // to pick up the rest of the family members
|
|
|
+
|
|
|
+ if (!wildcardMatch
|
|
|
+ && !multipleMatchers
|
|
|
+ && (keys[i].getTimestamp() != chosenTimestamp)) {
|
|
|
+ break;
|
|
|
+ }
|
|
|
|
|
|
- // NOTE: We used to do results.putAll(resultSets[i]);
|
|
|
- // but this had the effect of overwriting newer
|
|
|
- // values with older ones. So now we only insert
|
|
|
- // a result if the map does not contain the key.
|
|
|
-
|
|
|
- for(Map.Entry<Text, byte []> e: resultSets[i].entrySet()) {
|
|
|
- if(!results.containsKey(e.getKey())) {
|
|
|
- results.put(e.getKey(), e.getValue());
|
|
|
- insertedItem = true;
|
|
|
+ // Filter out null criteria columns that are not null
|
|
|
+ if (dataFilter != null) {
|
|
|
+ filtered = dataFilter.filterNotNull(resultSets[i]);
|
|
|
}
|
|
|
- }
|
|
|
|
|
|
- resultSets[i].clear();
|
|
|
- if(! scanners[i].next(keys[i], resultSets[i])) {
|
|
|
- closeScanner(i);
|
|
|
- }
|
|
|
- }
|
|
|
+ // NOTE: We used to do results.putAll(resultSets[i]);
|
|
|
+ // but this had the effect of overwriting newer
|
|
|
+ // values with older ones. So now we only insert
|
|
|
+ // a result if the map does not contain the key.
|
|
|
+
|
|
|
+ for (Map.Entry<Text, byte[]> e : resultSets[i].entrySet()) {
|
|
|
+ if (!filtered && moreToFollow &&
|
|
|
+ !results.containsKey(e.getKey())) {
|
|
|
+ if (dataFilter != null) {
|
|
|
+ // Filter whole row by column data?
|
|
|
+ filtered =
|
|
|
+ dataFilter.filter(chosenRow, e.getKey(), e.getValue());
|
|
|
+ if (filtered) {
|
|
|
+ results.clear();
|
|
|
+ break;
|
|
|
+ }
|
|
|
+ }
|
|
|
+ results.put(e.getKey(), e.getValue());
|
|
|
+ }
|
|
|
+ }
|
|
|
|
|
|
- // If the current scanner is non-null AND has a lower-or-equal
|
|
|
- // row label, then its timestamp is bad. We need to advance it.
|
|
|
+ resultSets[i].clear();
|
|
|
+ if (!scanners[i].next(keys[i], resultSets[i])) {
|
|
|
+ closeScanner(i);
|
|
|
+ }
|
|
|
+ }
|
|
|
|
|
|
- while((scanners[i] != null)
|
|
|
- && (keys[i].getRow().compareTo(chosenRow) <= 0)) {
|
|
|
-
|
|
|
- resultSets[i].clear();
|
|
|
- if(! scanners[i].next(keys[i], resultSets[i])) {
|
|
|
- closeScanner(i);
|
|
|
+ // If the current scanner is non-null AND has a lower-or-equal
|
|
|
+ // row label, then its timestamp is bad. We need to advance it.
|
|
|
+ while ((scanners[i] != null) &&
|
|
|
+ (keys[i].getRow().compareTo(chosenRow) <= 0)) {
|
|
|
+ resultSets[i].clear();
|
|
|
+ if (!scanners[i].next(keys[i], resultSets[i])) {
|
|
|
+ closeScanner(i);
|
|
|
+ }
|
|
|
}
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ moreToFollow = chosenTimestamp > 0;
|
|
|
+
|
|
|
+ if (dataFilter != null) {
|
|
|
+ if (moreToFollow && !filtered) {
|
|
|
+ dataFilter.acceptedRow(chosenRow);
|
|
|
+ }
|
|
|
+ if (dataFilter.filterAllRemaining()) {
|
|
|
+ moreToFollow = false;
|
|
|
+ LOG.debug("page limit");
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ // Make sure scanners closed if no more results
|
|
|
+ if (!moreToFollow) {
|
|
|
+ for (int i = 0; i < scanners.length; i++) {
|
|
|
+ if (null != scanners[i]) {
|
|
|
+ closeScanner(i);
|
|
|
}
|
|
|
}
|
|
|
}
|
|
|
- return insertedItem;
|
|
|
+
|
|
|
+ return moreToFollow;
|
|
|
}
|
|
|
|
|
|
+
|
|
|
/** Shut down a single scanner */
|
|
|
void closeScanner(int i) {
|
|
|
try {
|