|
@@ -30,6 +30,7 @@ import java.util.Map;
|
|
|
import java.util.Random;
|
|
|
import java.util.TreeMap;
|
|
|
import java.util.Vector;
|
|
|
+import java.util.Map.Entry;
|
|
|
|
|
|
import org.apache.commons.logging.Log;
|
|
|
import org.apache.commons.logging.LogFactory;
|
|
@@ -42,6 +43,7 @@ import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
|
|
|
import org.apache.hadoop.io.MapFile;
|
|
|
import org.apache.hadoop.io.SequenceFile;
|
|
|
import org.apache.hadoop.io.Text;
|
|
|
+import org.apache.hadoop.io.Writable;
|
|
|
import org.apache.hadoop.io.WritableComparable;
|
|
|
import org.apache.hadoop.util.StringUtils;
|
|
|
import org.onelab.filter.BloomFilter;
|
|
@@ -92,6 +94,8 @@ class HStore implements HConstants {
|
|
|
Random rand = new Random();
|
|
|
|
|
|
private long maxSeqId;
|
|
|
+
|
|
|
+ private int compactionThreshold;
|
|
|
|
|
|
/**
|
|
|
* An HStore is a set of zero or more MapFiles, which stretch backwards over
|
|
@@ -164,7 +168,7 @@ class HStore implements HConstants {
|
|
|
|
|
|
if(LOG.isDebugEnabled()) {
|
|
|
LOG.debug("starting " + this.storeName +
|
|
|
- ((reconstructionLog == null)?
|
|
|
+ ((reconstructionLog == null || !fs.exists(reconstructionLog))?
|
|
|
" (no reconstruction log)": " with reconstruction log: " +
|
|
|
reconstructionLog.toString()));
|
|
|
}
|
|
@@ -215,19 +219,19 @@ class HStore implements HConstants {
|
|
|
}
|
|
|
|
|
|
doReconstructionLog(reconstructionLog, maxSeqId);
|
|
|
- this.maxSeqId += 1;
|
|
|
|
|
|
- // Compact all the MapFiles into a single file. The resulting MapFile
|
|
|
- // should be "timeless"; that is, it should not have an associated seq-ID,
|
|
|
- // because all log messages have been reflected in the TreeMaps at this
|
|
|
- // point.
|
|
|
- //
|
|
|
- // TODO: Only do the compaction if we are over a threshold, not
|
|
|
- // every time. Not necessary if only two or three store files. Fix after
|
|
|
- // revamp of compaction.
|
|
|
- if(storefiles.size() > 1) {
|
|
|
- compactHelper(true);
|
|
|
- }
|
|
|
+ // By default, we compact if an HStore has more than
|
|
|
+ // MIN_COMMITS_FOR_COMPACTION map files
|
|
|
+ this.compactionThreshold =
|
|
|
+ conf.getInt("hbase.hstore.compactionThreshold", 3);
|
|
|
+
|
|
|
+ // We used to compact in here before bringing the store online. Instead
|
|
|
+ // get it online quick even if it needs compactions so we can start
|
|
|
+ // taking updates as soon as possible (Once online, can take updates even
|
|
|
+ // during a compaction).
|
|
|
+
|
|
|
+ // Move maxSeqId on by one. Why here? And not in HRegion?
|
|
|
+ this.maxSeqId += 1;
|
|
|
|
|
|
// Finally, start up all the map readers! (There should be just one at this
|
|
|
// point, as we've compacted them all.)
|
|
@@ -253,10 +257,6 @@ class HStore implements HConstants {
|
|
|
final long maxSeqID)
|
|
|
throws UnsupportedEncodingException, IOException {
|
|
|
if (reconstructionLog == null || !fs.exists(reconstructionLog)) {
|
|
|
- if (reconstructionLog != null && !fs.exists(reconstructionLog)) {
|
|
|
- LOG.warn("Passed reconstruction log " + reconstructionLog +
|
|
|
- " does not exist");
|
|
|
- }
|
|
|
// Nothing to do.
|
|
|
return;
|
|
|
}
|
|
@@ -397,15 +397,18 @@ class HStore implements HConstants {
|
|
|
* Close all the MapFile readers
|
|
|
* @throws IOException
|
|
|
*/
|
|
|
- void close() throws IOException {
|
|
|
+ Vector<HStoreFile> close() throws IOException {
|
|
|
+ Vector<HStoreFile> result = null;
|
|
|
this.lock.obtainWriteLock();
|
|
|
try {
|
|
|
for (MapFile.Reader reader: this.readers.values()) {
|
|
|
reader.close();
|
|
|
}
|
|
|
this.readers.clear();
|
|
|
+ result = new Vector<HStoreFile>(storefiles.values());
|
|
|
this.storefiles.clear();
|
|
|
LOG.info("closed " + this.storeName);
|
|
|
+ return result;
|
|
|
} finally {
|
|
|
this.lock.releaseWriteLock();
|
|
|
}
|
|
@@ -428,16 +431,15 @@ class HStore implements HConstants {
|
|
|
*
|
|
|
* @param inputCache memcache to flush
|
|
|
* @param logCacheFlushId flush sequence number
|
|
|
- * @return Vector of all the HStoreFiles in use
|
|
|
* @throws IOException
|
|
|
*/
|
|
|
- Vector<HStoreFile> flushCache(final TreeMap<HStoreKey, byte []> inputCache,
|
|
|
+ void flushCache(final TreeMap<HStoreKey, byte []> inputCache,
|
|
|
final long logCacheFlushId)
|
|
|
throws IOException {
|
|
|
- return flushCacheHelper(inputCache, logCacheFlushId, true);
|
|
|
+ flushCacheHelper(inputCache, logCacheFlushId, true);
|
|
|
}
|
|
|
|
|
|
- Vector<HStoreFile> flushCacheHelper(TreeMap<HStoreKey, byte []> inputCache,
|
|
|
+ void flushCacheHelper(TreeMap<HStoreKey, byte []> inputCache,
|
|
|
long logCacheFlushId, boolean addToAvailableMaps)
|
|
|
throws IOException {
|
|
|
synchronized(flushLock) {
|
|
@@ -447,12 +449,31 @@ class HStore implements HConstants {
|
|
|
String name = flushedFile.toString();
|
|
|
MapFile.Writer out = flushedFile.getWriter(this.fs, this.compression,
|
|
|
this.bloomFilter);
|
|
|
+
|
|
|
+ // hbase.hstore.compact.on.flush=true enables picking up an existing
|
|
|
+ // HStoreFIle from disk interlacing the memcache flush compacting as we
|
|
|
+ // go. The notion is that interlacing would take as long as a pure
|
|
|
+ // flush with the added benefit of having one less file in the store.
|
|
|
+ // Experiments show that it takes two to three times the amount of time
|
|
|
+ // flushing -- more column families makes it so the two timings come
|
|
|
+ // closer together -- but it also complicates the flush. Disabled for
|
|
|
+ // now. Needs work picking which file to interlace (favor references
|
|
|
+ // first, etc.)
|
|
|
+ //
|
|
|
+ // Related, looks like 'merging compactions' in BigTable paper interlaces
|
|
|
+ // a memcache flush. We don't.
|
|
|
try {
|
|
|
- for (Map.Entry<HStoreKey, byte []> es: inputCache.entrySet()) {
|
|
|
- HStoreKey curkey = es.getKey();
|
|
|
- if (this.familyName.
|
|
|
- equals(HStoreKey.extractFamily(curkey.getColumn()))) {
|
|
|
- out.append(curkey, new ImmutableBytesWritable(es.getValue()));
|
|
|
+ if (this.conf.getBoolean("hbase.hstore.compact.on.flush", false) &&
|
|
|
+ this.storefiles.size() > 0) {
|
|
|
+ compact(out, inputCache.entrySet().iterator(),
|
|
|
+ this.readers.get(this.storefiles.firstKey()));
|
|
|
+ } else {
|
|
|
+ for (Map.Entry<HStoreKey, byte []> es: inputCache.entrySet()) {
|
|
|
+ HStoreKey curkey = es.getKey();
|
|
|
+ if (this.familyName.
|
|
|
+ equals(HStoreKey.extractFamily(curkey.getColumn()))) {
|
|
|
+ out.append(curkey, new ImmutableBytesWritable(es.getValue()));
|
|
|
+ }
|
|
|
}
|
|
|
}
|
|
|
} finally {
|
|
@@ -486,14 +507,14 @@ class HStore implements HConstants {
|
|
|
this.lock.releaseWriteLock();
|
|
|
}
|
|
|
}
|
|
|
- return getAllMapFiles();
|
|
|
+ return;
|
|
|
}
|
|
|
}
|
|
|
|
|
|
/**
|
|
|
* @return - vector of all the HStore files in use
|
|
|
*/
|
|
|
- Vector<HStoreFile> getAllMapFiles() {
|
|
|
+ Vector<HStoreFile> getAllStoreFiles() {
|
|
|
this.lock.obtainReadLock();
|
|
|
try {
|
|
|
return new Vector<HStoreFile>(storefiles.values());
|
|
@@ -505,6 +526,14 @@ class HStore implements HConstants {
|
|
|
//////////////////////////////////////////////////////////////////////////////
|
|
|
// Compaction
|
|
|
//////////////////////////////////////////////////////////////////////////////
|
|
|
+
|
|
|
+ /**
|
|
|
+ * @return True if this store needs compaction.
|
|
|
+ */
|
|
|
+ public boolean needsCompaction() {
|
|
|
+ return this.storefiles != null &&
|
|
|
+ this.storefiles.size() >= this.compactionThreshold;
|
|
|
+ }
|
|
|
|
|
|
/**
|
|
|
* Compact the back-HStores. This method may take some time, so the calling
|
|
@@ -528,11 +557,24 @@ class HStore implements HConstants {
|
|
|
compactHelper(false);
|
|
|
}
|
|
|
|
|
|
- void compactHelper(boolean deleteSequenceInfo) throws IOException {
|
|
|
+ void compactHelper(final boolean deleteSequenceInfo)
|
|
|
+ throws IOException {
|
|
|
+ compactHelper(deleteSequenceInfo, -1);
|
|
|
+ }
|
|
|
+
|
|
|
+ /*
|
|
|
+ * @param deleteSequenceInfo True if we are to set the sequence number to -1
|
|
|
+ * on compacted file.
|
|
|
+ * @param maxSeenSeqID We may have already calculated the maxSeenSeqID. If
|
|
|
+ * so, pass it here. Otherwise, pass -1 and it will be calculated inside in
|
|
|
+ * this method.
|
|
|
+ * @throws IOException
|
|
|
+ */
|
|
|
+ void compactHelper(final boolean deleteSequenceInfo, long maxSeenSeqID)
|
|
|
+ throws IOException {
|
|
|
synchronized(compactLock) {
|
|
|
Path curCompactStore =
|
|
|
HStoreFile.getHStoreDir(compactdir, regionName, familyName);
|
|
|
- fs.mkdirs(curCompactStore);
|
|
|
if(LOG.isDebugEnabled()) {
|
|
|
LOG.debug("started compaction of " + storefiles.size() + " files in " +
|
|
|
curCompactStore.toString());
|
|
@@ -547,28 +589,32 @@ class HStore implements HConstants {
|
|
|
this.lock.releaseWriteLock();
|
|
|
}
|
|
|
|
|
|
- // Compute the max-sequenceID seen in any of the to-be-compacted
|
|
|
- // TreeMaps
|
|
|
- long maxSeenSeqID = -1;
|
|
|
- for (HStoreFile hsf: toCompactFiles) {
|
|
|
- long seqid = hsf.loadInfo(fs);
|
|
|
- if(seqid > 0) {
|
|
|
- if(seqid > maxSeenSeqID) {
|
|
|
- maxSeenSeqID = seqid;
|
|
|
- }
|
|
|
- }
|
|
|
- }
|
|
|
-
|
|
|
- HStoreFile compactedOutputFile
|
|
|
- = new HStoreFile(conf, compactdir, regionName, familyName, -1);
|
|
|
- if(toCompactFiles.size() == 1) {
|
|
|
- // TODO: Only rewrite if NOT a HSF reference file.
|
|
|
- if(LOG.isDebugEnabled()) {
|
|
|
+ HStoreFile compactedOutputFile =
|
|
|
+ new HStoreFile(conf, compactdir, regionName, familyName, -1);
|
|
|
+ if (toCompactFiles.size() < 1 ||
|
|
|
+ (toCompactFiles.size() == 1 &&
|
|
|
+ !toCompactFiles.get(0).isReference())) {
|
|
|
+ if (LOG.isDebugEnabled()) {
|
|
|
LOG.debug("nothing to compact for " + this.storeName);
|
|
|
}
|
|
|
- HStoreFile hsf = toCompactFiles.elementAt(0);
|
|
|
- if(hsf.loadInfo(fs) == -1) {
|
|
|
- return;
|
|
|
+ if (deleteSequenceInfo && toCompactFiles.size() == 1) {
|
|
|
+ toCompactFiles.get(0).writeInfo(fs, -1);
|
|
|
+ }
|
|
|
+ return;
|
|
|
+ }
|
|
|
+
|
|
|
+ fs.mkdirs(curCompactStore);
|
|
|
+
|
|
|
+ // Compute the max-sequenceID seen in any of the to-be-compacted
|
|
|
+ // TreeMaps if it hasn't been passed in to us.
|
|
|
+ if (maxSeenSeqID == -1) {
|
|
|
+ for (HStoreFile hsf: toCompactFiles) {
|
|
|
+ long seqid = hsf.loadInfo(fs);
|
|
|
+ if(seqid > 0) {
|
|
|
+ if(seqid > maxSeenSeqID) {
|
|
|
+ maxSeenSeqID = seqid;
|
|
|
+ }
|
|
|
+ }
|
|
|
}
|
|
|
}
|
|
|
|
|
@@ -577,108 +623,11 @@ class HStore implements HConstants {
|
|
|
compactedOutputFile.getWriter(this.fs, this.compression,
|
|
|
this.bloomFilter);
|
|
|
try {
|
|
|
- // We create a new set of MapFile.Reader objects so we don't screw up
|
|
|
- // the caching associated with the currently-loaded ones.
|
|
|
- //
|
|
|
- // Our iteration-based access pattern is practically designed to ruin
|
|
|
- // the cache.
|
|
|
- //
|
|
|
- // We work by opening a single MapFile.Reader for each file, and
|
|
|
- // iterating through them in parallel. We always increment the
|
|
|
- // lowest-ranked one. Updates to a single row/column will appear
|
|
|
- // ranked by timestamp. This allows us to throw out deleted values or
|
|
|
- // obsolete versions.
|
|
|
- MapFile.Reader[] rdrs = new MapFile.Reader[toCompactFiles.size()];
|
|
|
- HStoreKey[] keys = new HStoreKey[toCompactFiles.size()];
|
|
|
- ImmutableBytesWritable[] vals =
|
|
|
- new ImmutableBytesWritable[toCompactFiles.size()];
|
|
|
- boolean[] done = new boolean[toCompactFiles.size()];
|
|
|
- int pos = 0;
|
|
|
- for(HStoreFile hsf: toCompactFiles) {
|
|
|
- rdrs[pos] = hsf.getReader(this.fs, this.bloomFilter);
|
|
|
- keys[pos] = new HStoreKey();
|
|
|
- vals[pos] = new ImmutableBytesWritable();
|
|
|
- done[pos] = false;
|
|
|
- pos++;
|
|
|
- }
|
|
|
-
|
|
|
- // Now, advance through the readers in order. This will have the
|
|
|
- // effect of a run-time sort of the entire dataset.
|
|
|
- int numDone = 0;
|
|
|
- for(int i = 0; i < rdrs.length; i++) {
|
|
|
- rdrs[i].reset();
|
|
|
- done[i] = ! rdrs[i].next(keys[i], vals[i]);
|
|
|
- if(done[i]) {
|
|
|
- numDone++;
|
|
|
- }
|
|
|
- }
|
|
|
-
|
|
|
- int timesSeen = 0;
|
|
|
- Text lastRow = new Text();
|
|
|
- Text lastColumn = new Text();
|
|
|
- while(numDone < done.length) {
|
|
|
- // Find the reader with the smallest key
|
|
|
- int smallestKey = -1;
|
|
|
- for(int i = 0; i < rdrs.length; i++) {
|
|
|
- if(done[i]) {
|
|
|
- continue;
|
|
|
- }
|
|
|
-
|
|
|
- if(smallestKey < 0) {
|
|
|
- smallestKey = i;
|
|
|
- } else {
|
|
|
- if(keys[i].compareTo(keys[smallestKey]) < 0) {
|
|
|
- smallestKey = i;
|
|
|
- }
|
|
|
- }
|
|
|
- }
|
|
|
-
|
|
|
- // Reflect the current key/val in the output
|
|
|
- HStoreKey sk = keys[smallestKey];
|
|
|
- if(lastRow.equals(sk.getRow())
|
|
|
- && lastColumn.equals(sk.getColumn())) {
|
|
|
- timesSeen++;
|
|
|
- } else {
|
|
|
- timesSeen = 1;
|
|
|
- }
|
|
|
-
|
|
|
- if(timesSeen <= family.getMaxVersions()) {
|
|
|
- // Keep old versions until we have maxVersions worth.
|
|
|
- // Then just skip them.
|
|
|
- if(sk.getRow().getLength() != 0
|
|
|
- && sk.getColumn().getLength() != 0) {
|
|
|
- // Only write out objects which have a non-zero length key and
|
|
|
- // value
|
|
|
- compactedOut.append(sk, vals[smallestKey]);
|
|
|
- }
|
|
|
- }
|
|
|
-
|
|
|
- // TODO: I don't know what to do about deleted values. I currently
|
|
|
- // include the fact that the item was deleted as a legitimate
|
|
|
- // "version" of the data. Maybe it should just drop the deleted
|
|
|
- // val?
|
|
|
-
|
|
|
- // Update last-seen items
|
|
|
- lastRow.set(sk.getRow());
|
|
|
- lastColumn.set(sk.getColumn());
|
|
|
-
|
|
|
- // Advance the smallest key. If that reader's all finished, then
|
|
|
- // mark it as done.
|
|
|
- if(! rdrs[smallestKey].next(keys[smallestKey],
|
|
|
- vals[smallestKey])) {
|
|
|
- done[smallestKey] = true;
|
|
|
- rdrs[smallestKey].close();
|
|
|
- numDone++;
|
|
|
- }
|
|
|
- }
|
|
|
+ compact(compactedOut, toCompactFiles);
|
|
|
} finally {
|
|
|
compactedOut.close();
|
|
|
}
|
|
|
|
|
|
- if(LOG.isDebugEnabled()) {
|
|
|
- LOG.debug("writing new compacted HStore " + compactedOutputFile);
|
|
|
- }
|
|
|
-
|
|
|
// Now, write out an HSTORE_LOGINFOFILE for the brand-new TreeMap.
|
|
|
if((! deleteSequenceInfo) && maxSeenSeqID >= 0) {
|
|
|
compactedOutputFile.writeInfo(fs, maxSeenSeqID);
|
|
@@ -691,8 +640,7 @@ class HStore implements HConstants {
|
|
|
DataOutputStream out = new DataOutputStream(fs.create(filesToReplace));
|
|
|
try {
|
|
|
out.writeInt(toCompactFiles.size());
|
|
|
- for(Iterator<HStoreFile> it = toCompactFiles.iterator(); it.hasNext(); ) {
|
|
|
- HStoreFile hsf = it.next();
|
|
|
+ for(HStoreFile hsf: toCompactFiles) {
|
|
|
hsf.write(out);
|
|
|
}
|
|
|
} finally {
|
|
@@ -706,7 +654,207 @@ class HStore implements HConstants {
|
|
|
// Move the compaction into place.
|
|
|
processReadyCompaction();
|
|
|
} finally {
|
|
|
- fs.delete(compactdir);
|
|
|
+ if (fs.exists(compactdir)) {
|
|
|
+ fs.delete(compactdir);
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ /*
|
|
|
+ * Compact passed <code>toCompactFiles</code> into <code>compactedOut</code>.
|
|
|
+ * We create a new set of MapFile.Reader objects so we don't screw up
|
|
|
+ * the caching associated with the currently-loaded ones. Our
|
|
|
+ * iteration-based access pattern is practically designed to ruin
|
|
|
+ * the cache.
|
|
|
+ *
|
|
|
+ * We work by opening a single MapFile.Reader for each file, and
|
|
|
+ * iterating through them in parallel. We always increment the
|
|
|
+ * lowest-ranked one. Updates to a single row/column will appear
|
|
|
+ * ranked by timestamp. This allows us to throw out deleted values or
|
|
|
+ * obsolete versions.
|
|
|
+ * @param compactedOut
|
|
|
+ * @param toCompactFiles
|
|
|
+ * @throws IOException
|
|
|
+ */
|
|
|
+ void compact(final MapFile.Writer compactedOut,
|
|
|
+ final Vector<HStoreFile> toCompactFiles)
|
|
|
+ throws IOException {
|
|
|
+ int size = toCompactFiles.size();
|
|
|
+ CompactionReader[] rdrs = new CompactionReader[size];
|
|
|
+ int index = 0;
|
|
|
+ for (HStoreFile hsf: toCompactFiles) {
|
|
|
+ try {
|
|
|
+ rdrs[index++] =
|
|
|
+ new MapFileCompactionReader(hsf.getReader(fs, bloomFilter));
|
|
|
+ } catch (IOException e) {
|
|
|
+ // Add info about which file threw exception. It may not be in the
|
|
|
+ // exception message so output a message here where we know the
|
|
|
+ // culprit.
|
|
|
+ LOG.warn("Failed with " + e.toString() + ": " + hsf.toString() +
|
|
|
+ (hsf.isReference()? " " + hsf.getReference().toString(): ""));
|
|
|
+ throw e;
|
|
|
+ }
|
|
|
+ }
|
|
|
+ try {
|
|
|
+ compact(compactedOut, rdrs);
|
|
|
+ } finally {
|
|
|
+ for (int i = 0; i < rdrs.length; i++) {
|
|
|
+ if (rdrs[i] != null) {
|
|
|
+ try {
|
|
|
+ rdrs[i].close();
|
|
|
+ } catch (IOException e) {
|
|
|
+ LOG.warn("Exception closing reader", e);
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ interface CompactionReader {
|
|
|
+ public void close() throws IOException;
|
|
|
+ public boolean next(WritableComparable key, Writable val)
|
|
|
+ throws IOException;
|
|
|
+ public void reset() throws IOException;
|
|
|
+ }
|
|
|
+
|
|
|
+ class MapFileCompactionReader implements CompactionReader {
|
|
|
+ final MapFile.Reader reader;
|
|
|
+
|
|
|
+ MapFileCompactionReader(final MapFile.Reader r) {
|
|
|
+ this.reader = r;
|
|
|
+ }
|
|
|
+
|
|
|
+ public void close() throws IOException {
|
|
|
+ this.reader.close();
|
|
|
+ }
|
|
|
+
|
|
|
+ public boolean next(WritableComparable key, Writable val)
|
|
|
+ throws IOException {
|
|
|
+ return this.reader.next(key, val);
|
|
|
+ }
|
|
|
+
|
|
|
+ public void reset() throws IOException {
|
|
|
+ this.reader.reset();
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ void compact(final MapFile.Writer compactedOut,
|
|
|
+ final Iterator<Entry<HStoreKey, byte []>> iterator,
|
|
|
+ final MapFile.Reader reader)
|
|
|
+ throws IOException {
|
|
|
+ // Make an instance of a CompactionReader that wraps the iterator.
|
|
|
+ CompactionReader cr = new CompactionReader() {
|
|
|
+ public boolean next(WritableComparable key, Writable val)
|
|
|
+ throws IOException {
|
|
|
+ boolean result = false;
|
|
|
+ while (iterator.hasNext()) {
|
|
|
+ Entry<HStoreKey, byte []> e = iterator.next();
|
|
|
+ HStoreKey hsk = e.getKey();
|
|
|
+ if (familyName.equals(HStoreKey.extractFamily(hsk.getColumn()))) {
|
|
|
+ ((HStoreKey)key).set(hsk);
|
|
|
+ ((ImmutableBytesWritable)val).set(e.getValue());
|
|
|
+ result = true;
|
|
|
+ break;
|
|
|
+ }
|
|
|
+ }
|
|
|
+ return result;
|
|
|
+ }
|
|
|
+
|
|
|
+ @SuppressWarnings("unused")
|
|
|
+ public void reset() throws IOException {
|
|
|
+ // noop.
|
|
|
+ }
|
|
|
+
|
|
|
+ @SuppressWarnings("unused")
|
|
|
+ public void close() throws IOException {
|
|
|
+ // noop.
|
|
|
+ }
|
|
|
+ };
|
|
|
+
|
|
|
+ compact(compactedOut,
|
|
|
+ new CompactionReader [] {cr, new MapFileCompactionReader(reader)});
|
|
|
+ }
|
|
|
+
|
|
|
+ void compact(final MapFile.Writer compactedOut,
|
|
|
+ final CompactionReader [] rdrs)
|
|
|
+ throws IOException {
|
|
|
+ HStoreKey[] keys = new HStoreKey[rdrs.length];
|
|
|
+ ImmutableBytesWritable[] vals = new ImmutableBytesWritable[rdrs.length];
|
|
|
+ boolean[] done = new boolean[rdrs.length];
|
|
|
+ for(int i = 0; i < rdrs.length; i++) {
|
|
|
+ keys[i] = new HStoreKey();
|
|
|
+ vals[i] = new ImmutableBytesWritable();
|
|
|
+ done[i] = false;
|
|
|
+ }
|
|
|
+
|
|
|
+ // Now, advance through the readers in order. This will have the
|
|
|
+ // effect of a run-time sort of the entire dataset.
|
|
|
+ int numDone = 0;
|
|
|
+ for(int i = 0; i < rdrs.length; i++) {
|
|
|
+ rdrs[i].reset();
|
|
|
+ done[i] = ! rdrs[i].next(keys[i], vals[i]);
|
|
|
+ if(done[i]) {
|
|
|
+ numDone++;
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ int timesSeen = 0;
|
|
|
+ Text lastRow = new Text();
|
|
|
+ Text lastColumn = new Text();
|
|
|
+ while(numDone < done.length) {
|
|
|
+ // Find the reader with the smallest key
|
|
|
+ int smallestKey = -1;
|
|
|
+ for(int i = 0; i < rdrs.length; i++) {
|
|
|
+ if(done[i]) {
|
|
|
+ continue;
|
|
|
+ }
|
|
|
+ if(smallestKey < 0) {
|
|
|
+ smallestKey = i;
|
|
|
+ } else {
|
|
|
+ if(keys[i].compareTo(keys[smallestKey]) < 0) {
|
|
|
+ smallestKey = i;
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ // Reflect the current key/val in the output
|
|
|
+ HStoreKey sk = keys[smallestKey];
|
|
|
+ if(lastRow.equals(sk.getRow())
|
|
|
+ && lastColumn.equals(sk.getColumn())) {
|
|
|
+ timesSeen++;
|
|
|
+ } else {
|
|
|
+ timesSeen = 1;
|
|
|
+ }
|
|
|
+
|
|
|
+ if(timesSeen <= family.getMaxVersions()) {
|
|
|
+ // Keep old versions until we have maxVersions worth.
|
|
|
+ // Then just skip them.
|
|
|
+ if(sk.getRow().getLength() != 0
|
|
|
+ && sk.getColumn().getLength() != 0) {
|
|
|
+ // Only write out objects which have a non-zero length key and
|
|
|
+ // value
|
|
|
+ compactedOut.append(sk, vals[smallestKey]);
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ // TODO: I don't know what to do about deleted values. I currently
|
|
|
+ // include the fact that the item was deleted as a legitimate
|
|
|
+ // "version" of the data. Maybe it should just drop the deleted
|
|
|
+ // val?
|
|
|
+
|
|
|
+ // Update last-seen items
|
|
|
+ lastRow.set(sk.getRow());
|
|
|
+ lastColumn.set(sk.getColumn());
|
|
|
+
|
|
|
+ // Advance the smallest key. If that reader's all finished, then
|
|
|
+ // mark it as done.
|
|
|
+ if(!rdrs[smallestKey].next(keys[smallestKey],
|
|
|
+ vals[smallestKey])) {
|
|
|
+ done[smallestKey] = true;
|
|
|
+ rdrs[smallestKey].close();
|
|
|
+ rdrs[smallestKey] = null;
|
|
|
+ numDone++;
|
|
|
}
|
|
|
}
|
|
|
}
|
|
@@ -773,21 +921,19 @@ class HStore implements HConstants {
|
|
|
}
|
|
|
}
|
|
|
|
|
|
+ Vector<HStoreFile> toDelete = new Vector<HStoreFile>(keys.size());
|
|
|
for (Long key: keys) {
|
|
|
MapFile.Reader reader = this.readers.remove(key);
|
|
|
if (reader != null) {
|
|
|
reader.close();
|
|
|
}
|
|
|
HStoreFile hsf = this.storefiles.remove(key);
|
|
|
- // 4. Delete all old files, no longer needed
|
|
|
- hsf.delete();
|
|
|
- }
|
|
|
- if(LOG.isDebugEnabled()) {
|
|
|
- LOG.debug("deleted " + toCompactFiles.size() + " old file(s)");
|
|
|
+ // 4. Add to the toDelete files all old files, no longer needed
|
|
|
+ toDelete.add(hsf);
|
|
|
}
|
|
|
|
|
|
- // What if we fail now? The above deletes will fail silently. We'd better
|
|
|
- // make sure not to write out any new files with the same names as
|
|
|
+ // What if we fail now? The above deletes will fail silently. We'd
|
|
|
+ // better make sure not to write out any new files with the same names as
|
|
|
// something we delete, though.
|
|
|
|
|
|
// 5. Moving the new MapFile into place
|
|
@@ -800,9 +946,23 @@ class HStore implements HConstants {
|
|
|
compactdir.toString() +
|
|
|
" to " + finalCompactedFile.toString() + " in " + dir.toString());
|
|
|
}
|
|
|
- compactedFile.rename(this.fs, finalCompactedFile);
|
|
|
+ if (!compactedFile.rename(this.fs, finalCompactedFile)) {
|
|
|
+ LOG.error("Failed move of compacted file " +
|
|
|
+ finalCompactedFile.toString());
|
|
|
+ return;
|
|
|
+ }
|
|
|
+
|
|
|
+ // Safe to delete now compaction has been moved into place.
|
|
|
+ for (HStoreFile hsf: toDelete) {
|
|
|
+ if (hsf.getFileId() == finalCompactedFile.getFileId()) {
|
|
|
+ // Be careful we do not delte the just compacted file.
|
|
|
+ LOG.warn("Weird. File to delete has same name as one we are " +
|
|
|
+ "about to delete (skipping): " + hsf.getFileId());
|
|
|
+ continue;
|
|
|
+ }
|
|
|
+ hsf.delete();
|
|
|
+ }
|
|
|
|
|
|
- // Fail here? No worries.
|
|
|
Long orderVal = Long.valueOf(finalCompactedFile.loadInfo(fs));
|
|
|
|
|
|
// 6. Loading the new TreeMap.
|
|
@@ -810,7 +970,6 @@ class HStore implements HConstants {
|
|
|
finalCompactedFile.getReader(this.fs, this.bloomFilter));
|
|
|
this.storefiles.put(orderVal, finalCompactedFile);
|
|
|
} finally {
|
|
|
-
|
|
|
// 7. Releasing the write-lock
|
|
|
this.lock.releaseWriteLock();
|
|
|
}
|
|
@@ -838,6 +997,9 @@ class HStore implements HConstants {
|
|
|
map.reset();
|
|
|
ImmutableBytesWritable readval = new ImmutableBytesWritable();
|
|
|
HStoreKey readkey = (HStoreKey)map.getClosest(key, readval);
|
|
|
+ if (readkey == null) {
|
|
|
+ continue;
|
|
|
+ }
|
|
|
do {
|
|
|
Text readcol = readkey.getColumn();
|
|
|
if (results.get(readcol) == null
|
|
@@ -1004,7 +1166,7 @@ class HStore implements HConstants {
|
|
|
/**
|
|
|
* @return Returns the number of map files currently in use
|
|
|
*/
|
|
|
- int getNMaps() {
|
|
|
+ int countOfStoreFiles() {
|
|
|
this.lock.obtainReadLock();
|
|
|
try {
|
|
|
return storefiles.size();
|
|
@@ -1014,6 +1176,22 @@ class HStore implements HConstants {
|
|
|
}
|
|
|
}
|
|
|
|
|
|
+ boolean hasReferences() {
|
|
|
+ boolean result = false;
|
|
|
+ this.lock.obtainReadLock();
|
|
|
+ try {
|
|
|
+ for (HStoreFile hsf: this.storefiles.values()) {
|
|
|
+ if (hsf.isReference()) {
|
|
|
+ break;
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ } finally {
|
|
|
+ this.lock.releaseReadLock();
|
|
|
+ }
|
|
|
+ return result;
|
|
|
+ }
|
|
|
+
|
|
|
//////////////////////////////////////////////////////////////////////////////
|
|
|
// File administration
|
|
|
//////////////////////////////////////////////////////////////////////////////
|
|
@@ -1038,6 +1216,11 @@ class HStore implements HConstants {
|
|
|
|
|
|
return new HStoreScanner(timestamp, targetCols, firstRow);
|
|
|
}
|
|
|
+
|
|
|
+ @Override
|
|
|
+ public String toString() {
|
|
|
+ return this.storeName;
|
|
|
+ }
|
|
|
|
|
|
//////////////////////////////////////////////////////////////////////////////
|
|
|
// This class implements the HScannerInterface.
|