|
@@ -28,11 +28,17 @@ import org.apache.commons.logging.Log;
|
|
|
import org.apache.commons.logging.LogFactory;
|
|
|
import org.apache.hadoop.conf.Configuration;
|
|
|
import org.apache.hadoop.fs.FileSystem;
|
|
|
+import org.apache.hadoop.fs.FSDataInputStream;
|
|
|
+import org.apache.hadoop.fs.FSDataOutputStream;
|
|
|
import org.apache.hadoop.fs.Path;
|
|
|
import org.apache.hadoop.io.BytesWritable;
|
|
|
import org.apache.hadoop.io.MapFile;
|
|
|
import org.apache.hadoop.io.SequenceFile;
|
|
|
import org.apache.hadoop.io.Text;
|
|
|
+import org.apache.hadoop.io.Writable;
|
|
|
+import org.apache.hadoop.io.WritableComparable;
|
|
|
+
|
|
|
+import org.onelab.filter.*;
|
|
|
|
|
|
/*******************************************************************************
|
|
|
* HStore maintains a bunch of data files. It is responsible for maintaining
|
|
@@ -49,6 +55,8 @@ class HStore implements HConstants {
|
|
|
static final String WORKING_COMPACTION = "compaction.inprogress";
|
|
|
static final String COMPACTION_TO_REPLACE = "toreplace";
|
|
|
static final String COMPACTION_DONE = "done";
|
|
|
+
|
|
|
+ private static final String BLOOMFILTER_FILE_NAME = "filter";
|
|
|
|
|
|
Path dir;
|
|
|
Text regionName;
|
|
@@ -60,6 +68,8 @@ class HStore implements HConstants {
|
|
|
Path mapdir;
|
|
|
Path compactdir;
|
|
|
Path loginfodir;
|
|
|
+ Path filterDir;
|
|
|
+ Filter bloomFilter;
|
|
|
|
|
|
Integer compactLock = 0;
|
|
|
Integer flushLock = 0;
|
|
@@ -135,6 +145,16 @@ class HStore implements HConstants {
|
|
|
fs.mkdirs(mapdir);
|
|
|
this.loginfodir = HStoreFile.getInfoDir(dir, regionName, familyName);
|
|
|
fs.mkdirs(loginfodir);
|
|
|
+
|
|
|
+ if(family.bloomFilter == null) {
|
|
|
+ this.filterDir = null;
|
|
|
+ this.bloomFilter = null;
|
|
|
+
|
|
|
+ } else {
|
|
|
+ this.filterDir = HStoreFile.getFilterDir(dir, regionName, familyName);
|
|
|
+ fs.mkdirs(filterDir);
|
|
|
+ loadOrCreateBloomFilter();
|
|
|
+ }
|
|
|
|
|
|
if(LOG.isDebugEnabled()) {
|
|
|
LOG.debug("starting HStore for " + regionName + "/"+ familyName);
|
|
@@ -182,6 +202,9 @@ class HStore implements HConstants {
|
|
|
}
|
|
|
}
|
|
|
|
|
|
+ // If a bloom filter is enabled, try to read it in.
|
|
|
+ // If it doesn't exist, create it.
|
|
|
+
|
|
|
// Read the reconstructionLog to see whether we need to build a brand-new
|
|
|
// MapFile out of non-flushed log entries.
|
|
|
//
|
|
@@ -257,11 +280,163 @@ class HStore implements HConstants {
|
|
|
for(Map.Entry<Long, HStoreFile> e: mapFiles.entrySet()) {
|
|
|
// TODO - is this really necessary? Don't I do this inside compact()?
|
|
|
maps.put(e.getKey(),
|
|
|
- new MapFile.Reader(fs, e.getValue().getMapFilePath().toString(), conf));
|
|
|
+ getMapFileReader(e.getValue().getMapFilePath().toString()));
|
|
|
}
|
|
|
|
|
|
LOG.info("HStore online for " + this.regionName + "/" + this.familyName);
|
|
|
}
|
|
|
+
|
|
|
+ //////////////////////////////////////////////////////////////////////////////
|
|
|
+ // Bloom filters
|
|
|
+ //////////////////////////////////////////////////////////////////////////////
|
|
|
+
|
|
|
+ /**
|
|
|
+ * Called by constructor if a bloom filter is enabled for this column family.
|
|
|
+ * If the HStore already exists, it will read in the bloom filter saved
|
|
|
+ * previously. Otherwise, it will create a new bloom filter.
|
|
|
+ */
|
|
|
+ private void loadOrCreateBloomFilter() throws IOException {
|
|
|
+ Path filterFile = new Path(filterDir, BLOOMFILTER_FILE_NAME);
|
|
|
+ if(fs.exists(filterFile)) {
|
|
|
+ switch(family.bloomFilter.filterType) {
|
|
|
+
|
|
|
+ case BloomFilterDescriptor.BLOOMFILTER:
|
|
|
+ bloomFilter = new BloomFilter();
|
|
|
+ break;
|
|
|
+
|
|
|
+ case BloomFilterDescriptor.COUNTING_BLOOMFILTER:
|
|
|
+ bloomFilter = new CountingBloomFilter();
|
|
|
+ break;
|
|
|
+
|
|
|
+ case BloomFilterDescriptor.RETOUCHED_BLOOMFILTER:
|
|
|
+ bloomFilter = new RetouchedBloomFilter();
|
|
|
+ }
|
|
|
+ FSDataInputStream in = fs.open(filterFile);
|
|
|
+ bloomFilter.readFields(in);
|
|
|
+ fs.close();
|
|
|
+
|
|
|
+ } else {
|
|
|
+ switch(family.bloomFilter.filterType) {
|
|
|
+
|
|
|
+ case BloomFilterDescriptor.BLOOMFILTER:
|
|
|
+ bloomFilter = new BloomFilter(family.bloomFilter.vectorSize,
|
|
|
+ family.bloomFilter.nbHash);
|
|
|
+ break;
|
|
|
+
|
|
|
+ case BloomFilterDescriptor.COUNTING_BLOOMFILTER:
|
|
|
+ bloomFilter = new CountingBloomFilter(family.bloomFilter.vectorSize,
|
|
|
+ family.bloomFilter.nbHash);
|
|
|
+ break;
|
|
|
+
|
|
|
+ case BloomFilterDescriptor.RETOUCHED_BLOOMFILTER:
|
|
|
+ bloomFilter = new RetouchedBloomFilter(family.bloomFilter.vectorSize,
|
|
|
+ family.bloomFilter.nbHash);
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * Flushes bloom filter to disk
|
|
|
+ *
|
|
|
+ * @throws IOException
|
|
|
+ */
|
|
|
+ private void flushBloomFilter() throws IOException {
|
|
|
+ FSDataOutputStream out =
|
|
|
+ fs.create(new Path(filterDir, BLOOMFILTER_FILE_NAME));
|
|
|
+
|
|
|
+ bloomFilter.write(out);
|
|
|
+ out.close();
|
|
|
+ }
|
|
|
+
|
|
|
+ /** Generates a bloom filter key from the row and column keys */
|
|
|
+ Key getBloomFilterKey(HStoreKey k) {
|
|
|
+ StringBuilder s = new StringBuilder(k.getRow().toString());
|
|
|
+ s.append(k.getColumn().toString());
|
|
|
+ return new Key(s.toString().getBytes());
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * Extends MapFile.Reader and overrides get and getClosest to consult the
|
|
|
+ * bloom filter before attempting to read from disk.
|
|
|
+ */
|
|
|
+ private class BloomFilterReader extends MapFile.Reader {
|
|
|
+
|
|
|
+ BloomFilterReader(FileSystem fs, String dirName, Configuration conf)
|
|
|
+ throws IOException {
|
|
|
+ super(fs, dirName, conf);
|
|
|
+ }
|
|
|
+
|
|
|
+ @Override
|
|
|
+ public Writable get(WritableComparable key, Writable val) throws IOException {
|
|
|
+ // Note - the key being passed to us is always a HStoreKey
|
|
|
+
|
|
|
+ if(bloomFilter.membershipTest(getBloomFilterKey((HStoreKey)key))) {
|
|
|
+ return super.get(key, val);
|
|
|
+ }
|
|
|
+ return null;
|
|
|
+ }
|
|
|
+
|
|
|
+ @Override
|
|
|
+ public WritableComparable getClosest(WritableComparable key, Writable val)
|
|
|
+ throws IOException {
|
|
|
+ // Note - the key being passed to us is always a HStoreKey
|
|
|
+
|
|
|
+ if(bloomFilter.membershipTest(getBloomFilterKey((HStoreKey)key))) {
|
|
|
+ return super.getClosest(key, val);
|
|
|
+ }
|
|
|
+ return null;
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * Extends MapFile.Writer and overrides append, so that whenever a MapFile
|
|
|
+ * is written to, the key is added to the bloom filter.
|
|
|
+ */
|
|
|
+ private class BloomFilterWriter extends MapFile.Writer {
|
|
|
+
|
|
|
+ @SuppressWarnings("unchecked")
|
|
|
+ BloomFilterWriter(Configuration conf, FileSystem fs, String dirName,
|
|
|
+ Class keyClass, Class valClass, SequenceFile.CompressionType compression)
|
|
|
+ throws IOException {
|
|
|
+ super(conf, fs, dirName, keyClass, valClass, compression);
|
|
|
+ }
|
|
|
+
|
|
|
+ @Override
|
|
|
+ public void append(WritableComparable key, Writable val) throws IOException {
|
|
|
+ // Note - the key being passed to us is always a HStoreKey
|
|
|
+
|
|
|
+ bloomFilter.add(getBloomFilterKey((HStoreKey)key));
|
|
|
+ super.append(key, val);
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * Get a MapFile reader
|
|
|
+ * This allows us to substitute a BloomFilterReader if a bloom filter is enabled
|
|
|
+ */
|
|
|
+ MapFile.Reader getMapFileReader(String dirName) throws IOException {
|
|
|
+ if(bloomFilter != null) {
|
|
|
+ return new BloomFilterReader(fs, dirName, conf);
|
|
|
+ }
|
|
|
+ return new MapFile.Reader(fs, dirName, conf);
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * Get a MapFile writer
|
|
|
+ * This allows us to substitute a BloomFilterWriter if a bloom filter is enabled
|
|
|
+ */
|
|
|
+ MapFile.Writer getMapFileWriter(String dirName) throws IOException {
|
|
|
+ if(bloomFilter != null) {
|
|
|
+ return new BloomFilterWriter(conf, fs, dirName, HStoreKey.class,
|
|
|
+ BytesWritable.class, compression);
|
|
|
+ }
|
|
|
+ return new MapFile.Writer(conf, fs, dirName, HStoreKey.class,
|
|
|
+ BytesWritable.class, compression);
|
|
|
+ }
|
|
|
+
|
|
|
+ //////////////////////////////////////////////////////////////////////////////
|
|
|
+ // End bloom filters
|
|
|
+ //////////////////////////////////////////////////////////////////////////////
|
|
|
|
|
|
/**
|
|
|
* Turn off all the MapFile readers
|
|
@@ -327,8 +502,7 @@ class HStore implements HConstants {
|
|
|
LOG.debug("map file is: " + mapfile.toString());
|
|
|
}
|
|
|
|
|
|
- MapFile.Writer out = new MapFile.Writer(conf, fs, mapfile.toString(),
|
|
|
- HStoreKey.class, BytesWritable.class, compression);
|
|
|
+ MapFile.Writer out = getMapFileWriter(mapfile.toString());
|
|
|
|
|
|
try {
|
|
|
for (Map.Entry<HStoreKey, BytesWritable> es: inputCache.entrySet()) {
|
|
@@ -352,14 +526,20 @@ class HStore implements HConstants {
|
|
|
LOG.debug("writing log cache flush id");
|
|
|
}
|
|
|
flushedFile.writeInfo(fs, logCacheFlushId);
|
|
|
+
|
|
|
+ // C. Flush the bloom filter if any
|
|
|
+
|
|
|
+ if(bloomFilter != null) {
|
|
|
+ flushBloomFilter();
|
|
|
+ }
|
|
|
|
|
|
- // C. Finally, make the new MapFile available.
|
|
|
+ // D. Finally, make the new MapFile available.
|
|
|
|
|
|
if(addToAvailableMaps) {
|
|
|
this.lock.obtainWriteLock();
|
|
|
|
|
|
try {
|
|
|
- maps.put(logCacheFlushId, new MapFile.Reader(fs, mapfile.toString(), conf));
|
|
|
+ maps.put(logCacheFlushId, getMapFileReader(mapfile.toString()));
|
|
|
mapFiles.put(logCacheFlushId, flushedFile);
|
|
|
if(LOG.isDebugEnabled()) {
|
|
|
LOG.debug("HStore available for " + this.regionName + "/"
|
|
@@ -466,9 +646,8 @@ class HStore implements HConstants {
|
|
|
|
|
|
// Step through them, writing to the brand-new TreeMap
|
|
|
|
|
|
- MapFile.Writer compactedOut = new MapFile.Writer(conf, fs,
|
|
|
- compactedOutputFile.getMapFilePath().toString(), HStoreKey.class,
|
|
|
- BytesWritable.class, compression);
|
|
|
+ MapFile.Writer compactedOut =
|
|
|
+ getMapFileWriter(compactedOutputFile.getMapFilePath().toString());
|
|
|
|
|
|
try {
|
|
|
|
|
@@ -491,7 +670,7 @@ class HStore implements HConstants {
|
|
|
int pos = 0;
|
|
|
for(Iterator<HStoreFile> it = toCompactFiles.iterator(); it.hasNext(); ) {
|
|
|
HStoreFile hsf = it.next();
|
|
|
- readers[pos] = new MapFile.Reader(fs, hsf.getMapFilePath().toString(), conf);
|
|
|
+ readers[pos] = getMapFileReader(hsf.getMapFilePath().toString());
|
|
|
keys[pos] = new HStoreKey();
|
|
|
vals[pos] = new BytesWritable();
|
|
|
done[pos] = false;
|
|
@@ -772,8 +951,8 @@ class HStore implements HConstants {
|
|
|
}
|
|
|
|
|
|
mapFiles.put(orderVal, finalCompactedFile);
|
|
|
- maps.put(orderVal, new MapFile.Reader(fs,
|
|
|
- finalCompactedFile.getMapFilePath().toString(), conf));
|
|
|
+ maps.put(orderVal, getMapFileReader(
|
|
|
+ finalCompactedFile.getMapFilePath().toString()));
|
|
|
|
|
|
} finally {
|
|
|
|
|
@@ -988,7 +1167,7 @@ class HStore implements HConstants {
|
|
|
int i = readers.length - 1;
|
|
|
for(Iterator<HStoreFile> it = mapFiles.values().iterator(); it.hasNext(); ) {
|
|
|
HStoreFile curHSF = it.next();
|
|
|
- readers[i--] = new MapFile.Reader(fs, curHSF.getMapFilePath().toString(), conf);
|
|
|
+ readers[i--] = getMapFileReader(curHSF.getMapFilePath().toString());
|
|
|
}
|
|
|
|
|
|
this.keys = new HStoreKey[readers.length];
|