|
@@ -62,8 +62,6 @@ import org.onelab.filter.RetouchedBloomFilter;
|
|
|
class HStore implements HConstants {
|
|
|
static final Log LOG = LogFactory.getLog(HStore.class);
|
|
|
|
|
|
- static final String COMPACTION_DIR = "compaction.tmp";
|
|
|
- static final String WORKING_COMPACTION = "compaction.inprogress";
|
|
|
static final String COMPACTION_TO_REPLACE = "toreplace";
|
|
|
static final String COMPACTION_DONE = "done";
|
|
|
|
|
@@ -77,11 +75,11 @@ class HStore implements HConstants {
|
|
|
FileSystem fs;
|
|
|
Configuration conf;
|
|
|
Path mapdir;
|
|
|
- Path compactdir;
|
|
|
Path loginfodir;
|
|
|
Path filterDir;
|
|
|
Filter bloomFilter;
|
|
|
private String storeName;
|
|
|
+ private final Path compactionDir;
|
|
|
|
|
|
Integer compactLock = new Integer(0);
|
|
|
Integer flushLock = new Integer(0);
|
|
@@ -133,6 +131,7 @@ class HStore implements HConstants {
|
|
|
FileSystem fs, Path reconstructionLog, Configuration conf)
|
|
|
throws IOException {
|
|
|
this.dir = dir;
|
|
|
+ this.compactionDir = new Path(dir, "compaction.dir");
|
|
|
this.regionName = regionName;
|
|
|
this.family = family;
|
|
|
this.familyName = HStoreKey.extractFamily(this.family.getName());
|
|
@@ -172,17 +171,6 @@ class HStore implements HConstants {
|
|
|
" (no reconstruction log)": " with reconstruction log: " +
|
|
|
reconstructionLog.toString()));
|
|
|
}
|
|
|
-
|
|
|
- // Either restart or get rid of any leftover compaction work. Either way,
|
|
|
- // by the time processReadyCompaction() returns, we can get rid of the
|
|
|
- // existing compaction-dir.
|
|
|
- this.compactdir = new Path(dir, COMPACTION_DIR);
|
|
|
- Path curCompactStore =
|
|
|
- HStoreFile.getHStoreDir(compactdir, regionName, familyName);
|
|
|
- if(fs.exists(curCompactStore)) {
|
|
|
- processReadyCompaction();
|
|
|
- fs.delete(curCompactStore);
|
|
|
- }
|
|
|
|
|
|
// Go through the 'mapdir' and 'loginfodir' together, make sure that all
|
|
|
// MapFiles are in a reliable state. Every entry in 'mapdir' must have a
|
|
@@ -409,7 +397,7 @@ class HStore implements HConstants {
|
|
|
this.readers.clear();
|
|
|
result = new Vector<HStoreFile>(storefiles.values());
|
|
|
this.storefiles.clear();
|
|
|
- LOG.info("closed " + this.storeName);
|
|
|
+ LOG.debug("closed " + this.storeName);
|
|
|
return result;
|
|
|
} finally {
|
|
|
this.lock.releaseWriteLock();
|
|
@@ -563,7 +551,7 @@ class HStore implements HConstants {
|
|
|
throws IOException {
|
|
|
compactHelper(deleteSequenceInfo, -1);
|
|
|
}
|
|
|
-
|
|
|
+
|
|
|
/*
|
|
|
* @param deleteSequenceInfo True if we are to set the sequence number to -1
|
|
|
* on compacted file.
|
|
@@ -577,23 +565,22 @@ class HStore implements HConstants {
|
|
|
long maxId = maxSeenSeqID;
|
|
|
synchronized(compactLock) {
|
|
|
Path curCompactStore =
|
|
|
- HStoreFile.getHStoreDir(compactdir, regionName, familyName);
|
|
|
+ HStoreFile.getHStoreDir(this.compactionDir, regionName, familyName);
|
|
|
if(LOG.isDebugEnabled()) {
|
|
|
LOG.debug("started compaction of " + storefiles.size() + " files in " +
|
|
|
curCompactStore.toString());
|
|
|
}
|
|
|
- try {
|
|
|
- // Grab a list of files to compact.
|
|
|
- Vector<HStoreFile> toCompactFiles = null;
|
|
|
- this.lock.obtainWriteLock();
|
|
|
- try {
|
|
|
- toCompactFiles = new Vector<HStoreFile>(storefiles.values());
|
|
|
- } finally {
|
|
|
- this.lock.releaseWriteLock();
|
|
|
+ if (this.fs.exists(curCompactStore)) {
|
|
|
+ LOG.warn("Cleaning up a previous incomplete compaction at " +
|
|
|
+ curCompactStore.toString());
|
|
|
+ if (!this.fs.delete(curCompactStore)) {
|
|
|
+ LOG.warn("Deleted returned false on " + curCompactStore.toString());
|
|
|
}
|
|
|
-
|
|
|
+ }
|
|
|
+ try {
|
|
|
+ Vector<HStoreFile> toCompactFiles = getFilesToCompact();
|
|
|
HStoreFile compactedOutputFile =
|
|
|
- new HStoreFile(conf, compactdir, regionName, familyName, -1);
|
|
|
+ new HStoreFile(conf, this.compactionDir, regionName, familyName, -1);
|
|
|
if (toCompactFiles.size() < 1 ||
|
|
|
(toCompactFiles.size() == 1 &&
|
|
|
!toCompactFiles.get(0).isReference())) {
|
|
@@ -606,7 +593,9 @@ class HStore implements HConstants {
|
|
|
return;
|
|
|
}
|
|
|
|
|
|
- fs.mkdirs(curCompactStore);
|
|
|
+ if (!fs.mkdirs(curCompactStore)) {
|
|
|
+ LOG.warn("Mkdir on " + curCompactStore.toString() + " failed");
|
|
|
+ }
|
|
|
|
|
|
// Compute the max-sequenceID seen in any of the to-be-compacted
|
|
|
// TreeMaps if it hasn't been passed in to us.
|
|
@@ -657,13 +646,31 @@ class HStore implements HConstants {
|
|
|
// Move the compaction into place.
|
|
|
processReadyCompaction();
|
|
|
} finally {
|
|
|
- if (fs.exists(compactdir)) {
|
|
|
- fs.delete(compactdir);
|
|
|
+ // Clean up the parent -- the region dir in the compactions directory.
|
|
|
+ if (this.fs.exists(curCompactStore.getParent())) {
|
|
|
+ if (!this.fs.delete(curCompactStore.getParent())) {
|
|
|
+ LOG.warn("Delete returned false deleting " +
|
|
|
+ curCompactStore.getParent().toString());
|
|
|
+ }
|
|
|
}
|
|
|
}
|
|
|
}
|
|
|
}
|
|
|
|
|
|
+ /*
|
|
|
+ * @return list of files to compact
|
|
|
+ */
|
|
|
+ private Vector<HStoreFile> getFilesToCompact() {
|
|
|
+ Vector<HStoreFile> toCompactFiles = null;
|
|
|
+ this.lock.obtainWriteLock();
|
|
|
+ try {
|
|
|
+ toCompactFiles = new Vector<HStoreFile>(storefiles.values());
|
|
|
+ } finally {
|
|
|
+ this.lock.releaseWriteLock();
|
|
|
+ }
|
|
|
+ return toCompactFiles;
|
|
|
+ }
|
|
|
+
|
|
|
/*
|
|
|
* Compact passed <code>toCompactFiles</code> into <code>compactedOut</code>.
|
|
|
* We create a new set of MapFile.Reader objects so we don't screw up
|
|
@@ -886,33 +893,34 @@ class HStore implements HConstants {
|
|
|
}
|
|
|
}
|
|
|
|
|
|
- /**
|
|
|
+ /*
|
|
|
* It's assumed that the compactLock will be acquired prior to calling this
|
|
|
* method! Otherwise, it is not thread-safe!
|
|
|
*
|
|
|
* It works by processing a compaction that's been written to disk.
|
|
|
*
|
|
|
- * It is usually invoked at the end of a compaction, but might also be
|
|
|
+ * <p>It is usually invoked at the end of a compaction, but might also be
|
|
|
* invoked at HStore startup, if the prior execution died midway through.
|
|
|
+ *
|
|
|
+ * <p>Moving the compacted TreeMap into place means:
|
|
|
+ * <pre>
|
|
|
+ * 1) Acquiring the write-lock
|
|
|
+ * 2) Figuring out what MapFiles are going to be replaced
|
|
|
+ * 3) Moving the new compacted MapFile into place
|
|
|
+ * 4) Unloading all the replaced MapFiles.
|
|
|
+ * 5) Deleting all the old MapFile files.
|
|
|
+ * 6) Loading the new TreeMap.
|
|
|
+ * 7) Releasing the write-lock
|
|
|
+ * </pre>
|
|
|
*/
|
|
|
void processReadyCompaction() throws IOException {
|
|
|
- // Move the compacted TreeMap into place.
|
|
|
- // That means:
|
|
|
- // 1) Acquiring the write-lock
|
|
|
- // 2) Figuring out what MapFiles are going to be replaced
|
|
|
- // 3) Unloading all the replaced MapFiles.
|
|
|
- // 4) Deleting all the old MapFile files.
|
|
|
- // 5) Moving the new MapFile into place
|
|
|
- // 6) Loading the new TreeMap.
|
|
|
- // 7) Releasing the write-lock
|
|
|
-
|
|
|
// 1. Acquiring the write-lock
|
|
|
Path curCompactStore =
|
|
|
- HStoreFile.getHStoreDir(compactdir, regionName, familyName);
|
|
|
+ HStoreFile.getHStoreDir(this.compactionDir, regionName, familyName);
|
|
|
this.lock.obtainWriteLock();
|
|
|
try {
|
|
|
Path doneFile = new Path(curCompactStore, COMPACTION_DONE);
|
|
|
- if(!fs.exists(doneFile)) {
|
|
|
+ if (!fs.exists(doneFile)) {
|
|
|
// The last execution didn't finish the compaction, so there's nothing
|
|
|
// we can do. We'll just have to redo it. Abandon it and return.
|
|
|
LOG.warn("Redoing a failed compaction");
|
|
@@ -920,7 +928,6 @@ class HStore implements HConstants {
|
|
|
}
|
|
|
|
|
|
// 2. Load in the files to be deleted.
|
|
|
- // (Figuring out what MapFiles are going to be replaced)
|
|
|
Vector<HStoreFile> toCompactFiles = new Vector<HStoreFile>();
|
|
|
Path filesToReplace = new Path(curCompactStore, COMPACTION_TO_REPLACE);
|
|
|
DataInputStream in = new DataInputStream(fs.open(filesToReplace));
|
|
@@ -936,41 +943,16 @@ class HStore implements HConstants {
|
|
|
in.close();
|
|
|
}
|
|
|
|
|
|
- // 3. Unload all the replaced MapFiles. Do it by getting keys of all
|
|
|
- // to remove. Then cycling on keys, removing, closing and deleting.
|
|
|
-
|
|
|
- // What if we crash at this point? No big deal; we will restart
|
|
|
- // processReadyCompaction(), and nothing has been lost.
|
|
|
- Vector<Long> keys = new Vector<Long>(toCompactFiles.size());
|
|
|
- for(Map.Entry<Long, HStoreFile> e: storefiles.entrySet()) {
|
|
|
- if(toCompactFiles.contains(e.getValue())) {
|
|
|
- keys.add(e.getKey());
|
|
|
- }
|
|
|
- }
|
|
|
-
|
|
|
- Vector<HStoreFile> toDelete = new Vector<HStoreFile>(keys.size());
|
|
|
- for (Long key: keys) {
|
|
|
- MapFile.Reader reader = this.readers.remove(key);
|
|
|
- if (reader != null) {
|
|
|
- reader.close();
|
|
|
- }
|
|
|
- HStoreFile hsf = this.storefiles.remove(key);
|
|
|
- // 4. Add to the toDelete files all old files, no longer needed
|
|
|
- toDelete.add(hsf);
|
|
|
- }
|
|
|
-
|
|
|
- // What if we fail now? The above deletes will fail silently. We'd
|
|
|
- // better make sure not to write out any new files with the same names as
|
|
|
- // something we delete, though.
|
|
|
-
|
|
|
- // 5. Moving the new MapFile into place
|
|
|
+ // 3. Moving the new MapFile into place.
|
|
|
HStoreFile compactedFile
|
|
|
- = new HStoreFile(conf, compactdir, regionName, familyName, -1);
|
|
|
+ = new HStoreFile(conf, this.compactionDir, regionName, familyName, -1);
|
|
|
+ // obtainNewHStoreFile does its best to generate a filename that does not
|
|
|
+ // currently exist.
|
|
|
HStoreFile finalCompactedFile
|
|
|
= HStoreFile.obtainNewHStoreFile(conf, dir, regionName, familyName, fs);
|
|
|
if(LOG.isDebugEnabled()) {
|
|
|
LOG.debug("moving " + compactedFile.toString() + " in " +
|
|
|
- compactdir.toString() +
|
|
|
+ this.compactionDir.toString() +
|
|
|
" to " + finalCompactedFile.toString() + " in " + dir.toString());
|
|
|
}
|
|
|
if (!compactedFile.rename(this.fs, finalCompactedFile)) {
|
|
@@ -978,24 +960,37 @@ class HStore implements HConstants {
|
|
|
finalCompactedFile.toString());
|
|
|
return;
|
|
|
}
|
|
|
-
|
|
|
- // Safe to delete now compaction has been moved into place.
|
|
|
- for (HStoreFile hsf: toDelete) {
|
|
|
- if (hsf.getFileId() == finalCompactedFile.getFileId()) {
|
|
|
- // Be careful we do not delte the just compacted file.
|
|
|
- LOG.warn("Weird. File to delete has same name as one we are " +
|
|
|
- "about to delete (skipping): " + hsf.getFileId());
|
|
|
+
|
|
|
+ // 4. and 5. Unload all the replaced MapFiles, close and delete.
|
|
|
+ Vector<Long> toDelete = new Vector<Long>(toCompactFiles.size());
|
|
|
+ for (Map.Entry<Long, HStoreFile> e: this.storefiles.entrySet()) {
|
|
|
+ if (!toCompactFiles.contains(e.getValue())) {
|
|
|
continue;
|
|
|
}
|
|
|
- hsf.delete();
|
|
|
+ Long key = e.getKey();
|
|
|
+ MapFile.Reader reader = this.readers.remove(key);
|
|
|
+ if (reader != null) {
|
|
|
+ reader.close();
|
|
|
+ }
|
|
|
+ toDelete.add(key);
|
|
|
}
|
|
|
+
|
|
|
+ try {
|
|
|
+ for (Long key: toDelete) {
|
|
|
+ HStoreFile hsf = this.storefiles.remove(key);
|
|
|
+ hsf.delete();
|
|
|
+ }
|
|
|
|
|
|
- Long orderVal = Long.valueOf(finalCompactedFile.loadInfo(fs));
|
|
|
-
|
|
|
- // 6. Loading the new TreeMap.
|
|
|
- this.readers.put(orderVal,
|
|
|
- finalCompactedFile.getReader(this.fs, this.bloomFilter));
|
|
|
- this.storefiles.put(orderVal, finalCompactedFile);
|
|
|
+ // 6. Loading the new TreeMap.
|
|
|
+ Long orderVal = Long.valueOf(finalCompactedFile.loadInfo(fs));
|
|
|
+ this.readers.put(orderVal,
|
|
|
+ finalCompactedFile.getReader(this.fs, this.bloomFilter));
|
|
|
+ this.storefiles.put(orderVal, finalCompactedFile);
|
|
|
+ } finally {
|
|
|
+ LOG.warn("Failed replacing compacted files. Compacted fle is " +
|
|
|
+ finalCompactedFile.toString() + ". Files replaced are " +
|
|
|
+ toCompactFiles.toString() + " some of which may have been removed");
|
|
|
+ }
|
|
|
} finally {
|
|
|
// 7. Releasing the write-lock
|
|
|
this.lock.releaseWriteLock();
|