|
@@ -542,7 +542,8 @@ class HStore implements HConstants {
|
|
|
HBaseConfiguration conf) throws IOException {
|
|
|
|
|
|
this.dir = dir;
|
|
|
- this.compactionDir = new Path(dir, "compaction.dir");
|
|
|
+ this.compactionDir = new Path(HRegion.getRegionDir(dir, encodedName),
|
|
|
+ "compaction.dir");
|
|
|
this.regionName = regionName;
|
|
|
this.encodedRegionName = encodedName;
|
|
|
this.family = family;
|
|
@@ -603,16 +604,7 @@ class HStore implements HConstants {
|
|
|
// means it was built prior to the previous run of HStore, and so it cannot
|
|
|
// contain any updates also contained in the log.
|
|
|
|
|
|
- long maxSeqID = -1;
|
|
|
- for (HStoreFile hsf: hstoreFiles) {
|
|
|
- long seqid = hsf.loadInfo(fs);
|
|
|
- if(seqid > 0) {
|
|
|
- if(seqid > maxSeqID) {
|
|
|
- maxSeqID = seqid;
|
|
|
- }
|
|
|
- }
|
|
|
- }
|
|
|
- this.maxSeqId = maxSeqID;
|
|
|
+ this.maxSeqId = getMaxSequenceId(hstoreFiles);
|
|
|
if (LOG.isDebugEnabled()) {
|
|
|
LOG.debug("maximum sequence id for hstore " + storeName + " is " +
|
|
|
this.maxSeqId);
|
|
@@ -641,6 +633,25 @@ class HStore implements HConstants {
|
|
|
}
|
|
|
}
|
|
|
|
|
|
+ /*
|
|
|
+ * @param hstoreFiles
|
|
|
+ * @return Maximum sequence number found or -1.
|
|
|
+ * @throws IOException
|
|
|
+ */
|
|
|
+ private long getMaxSequenceId(final List<HStoreFile> hstoreFiles)
|
|
|
+ throws IOException {
|
|
|
+ long maxSeqID = -1;
|
|
|
+ for (HStoreFile hsf : hstoreFiles) {
|
|
|
+ long seqid = hsf.loadInfo(fs);
|
|
|
+ if (seqid > 0) {
|
|
|
+ if (seqid > maxSeqID) {
|
|
|
+ maxSeqID = seqid;
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+ return maxSeqID;
|
|
|
+ }
|
|
|
+
|
|
|
long getMaxSequenceId() {
|
|
|
return this.maxSeqId;
|
|
|
}
|
|
@@ -670,16 +681,17 @@ class HStore implements HConstants {
|
|
|
try {
|
|
|
HLogKey key = new HLogKey();
|
|
|
HLogEdit val = new HLogEdit();
|
|
|
+ long skippedEdits = 0;
|
|
|
while (login.next(key, val)) {
|
|
|
maxSeqIdInLog = Math.max(maxSeqIdInLog, key.getLogSeqNum());
|
|
|
if (key.getLogSeqNum() <= maxSeqID) {
|
|
|
- if (LOG.isDebugEnabled()) {
|
|
|
- LOG.debug("Skipping edit <" + key.toString() + "=" +
|
|
|
- val.toString() + "> key sequence: " + key.getLogSeqNum() +
|
|
|
- " max sequence: " + maxSeqID);
|
|
|
- }
|
|
|
+ skippedEdits++;
|
|
|
continue;
|
|
|
}
|
|
|
+ if (skippedEdits > 0 && LOG.isDebugEnabled()) {
|
|
|
+ LOG.debug("Skipped " + skippedEdits +
|
|
|
+ " edits because sequence id <= " + maxSeqID);
|
|
|
+ }
|
|
|
// Check this edit is for me. Also, guard against writing
|
|
|
// METACOLUMN info such as HBASE::CACHEFLUSH entries
|
|
|
Text column = val.getColumn();
|
|
@@ -977,119 +989,88 @@ class HStore implements HConstants {
|
|
|
* @return true if compaction completed successfully
|
|
|
*/
|
|
|
boolean compact() throws IOException {
|
|
|
- long maxId = -1;
|
|
|
synchronized (compactLock) {
|
|
|
- Path curCompactStore = HStoreFile.getHStoreDir(this.compactionDir,
|
|
|
- encodedRegionName, familyName);
|
|
|
+ Path curCompactStore = getCompactionDir();
|
|
|
if (LOG.isDebugEnabled()) {
|
|
|
- LOG.debug("started compaction of " + storefiles.size() + " files in " +
|
|
|
- curCompactStore.toString());
|
|
|
+ LOG.debug("started compaction of " + storefiles.size() +
|
|
|
+ " files using " + curCompactStore.toString());
|
|
|
}
|
|
|
if (this.fs.exists(curCompactStore)) {
|
|
|
- LOG.warn("Cleaning up a previous incomplete compaction at " +
|
|
|
- curCompactStore.toString());
|
|
|
- if (!this.fs.delete(curCompactStore)) {
|
|
|
- LOG.warn("Deleted returned false on " + curCompactStore.toString());
|
|
|
+ // Clean out its content in prep. for this new compaction. Has either
|
|
|
+ // aborted previous compaction or it has content of a previous
|
|
|
+ // compaction.
|
|
|
+ Path [] toRemove = this.fs.listPaths(new Path [] {curCompactStore});
|
|
|
+ for (int i = 0; i < toRemove.length; i++) {
|
|
|
+ this.fs.delete(toRemove[i]);
|
|
|
}
|
|
|
}
|
|
|
- try {
|
|
|
- // Storefiles are keyed by sequence id. The oldest file comes first.
|
|
|
- // We need to return out of here a List that has the newest file as
|
|
|
- // first.
|
|
|
- List<HStoreFile> filesToCompact =
|
|
|
- new ArrayList<HStoreFile>(this.storefiles.values());
|
|
|
- Collections.reverse(filesToCompact);
|
|
|
-
|
|
|
- HStoreFile compactedOutputFile = new HStoreFile(conf,
|
|
|
- this.compactionDir, encodedRegionName, familyName, -1);
|
|
|
- if (filesToCompact.size() < 1 ||
|
|
|
- (filesToCompact.size() == 1 &&
|
|
|
- !filesToCompact.get(0).isReference())) {
|
|
|
- if (LOG.isDebugEnabled()) {
|
|
|
- LOG.debug("nothing to compact for " + this.storeName);
|
|
|
- }
|
|
|
- return false;
|
|
|
- }
|
|
|
-
|
|
|
- if (!fs.mkdirs(curCompactStore)) {
|
|
|
- LOG.warn("Mkdir on " + curCompactStore.toString() + " failed");
|
|
|
- }
|
|
|
-
|
|
|
- // Compute the max-sequenceID seen in any of the to-be-compacted
|
|
|
- // TreeMaps if it hasn't been passed in to us.
|
|
|
- if (maxId == -1) {
|
|
|
- for (HStoreFile hsf: filesToCompact) {
|
|
|
- long seqid = hsf.loadInfo(fs);
|
|
|
- if (seqid > 0) {
|
|
|
- if (seqid > maxId) {
|
|
|
- maxId = seqid;
|
|
|
- }
|
|
|
- }
|
|
|
- }
|
|
|
- }
|
|
|
-
|
|
|
- // Step through them, writing to the brand-new TreeMap
|
|
|
- MapFile.Writer compactedOut = compactedOutputFile.getWriter(this.fs,
|
|
|
- this.compression, this.bloomFilter);
|
|
|
- try {
|
|
|
- compactHStoreFiles(compactedOut, filesToCompact);
|
|
|
- } finally {
|
|
|
- compactedOut.close();
|
|
|
+ // Storefiles are keyed by sequence id. The oldest file comes first.
|
|
|
+ // We need to return out of here a List that has the newest file first.
|
|
|
+ List<HStoreFile> filesToCompact =
|
|
|
+ new ArrayList<HStoreFile>(this.storefiles.values());
|
|
|
+ Collections.reverse(filesToCompact);
|
|
|
+ if (filesToCompact.size() < 1 ||
|
|
|
+ (filesToCompact.size() == 1 && !filesToCompact.get(0).isReference())) {
|
|
|
+ if (LOG.isDebugEnabled()) {
|
|
|
+ LOG.debug("nothing to compact for " + this.storeName);
|
|
|
}
|
|
|
+ return false;
|
|
|
+ }
|
|
|
|
|
|
- // Now, write out an HSTORE_LOGINFOFILE for the brand-new TreeMap.
|
|
|
- if (maxId >= 0) {
|
|
|
- compactedOutputFile.writeInfo(fs, maxId);
|
|
|
- } else {
|
|
|
- compactedOutputFile.writeInfo(fs, -1);
|
|
|
- }
|
|
|
+ if (!fs.exists(curCompactStore) && !fs.mkdirs(curCompactStore)) {
|
|
|
+ LOG.warn("Mkdir on " + curCompactStore.toString() + " failed");
|
|
|
+ return false;
|
|
|
+ }
|
|
|
|
|
|
- // Write out a list of data files that we're replacing
|
|
|
- Path filesToReplace = new Path(curCompactStore, COMPACTION_TO_REPLACE);
|
|
|
- DataOutputStream out = new DataOutputStream(fs.create(filesToReplace));
|
|
|
- try {
|
|
|
- out.writeInt(filesToCompact.size());
|
|
|
- for (HStoreFile hsf: filesToCompact) {
|
|
|
- hsf.write(out);
|
|
|
- }
|
|
|
- } finally {
|
|
|
- out.close();
|
|
|
- }
|
|
|
+ // Step through them, writing to the brand-new TreeMap
|
|
|
+ HStoreFile compactedOutputFile = new HStoreFile(conf, this.compactionDir,
|
|
|
+ encodedRegionName, familyName, -1);
|
|
|
+ MapFile.Writer compactedOut = compactedOutputFile.getWriter(this.fs,
|
|
|
+ this.compression, this.bloomFilter);
|
|
|
+ try {
|
|
|
+ compactHStoreFiles(compactedOut, filesToCompact);
|
|
|
+ } finally {
|
|
|
+ compactedOut.close();
|
|
|
+ }
|
|
|
|
|
|
- // Indicate that we're done.
|
|
|
- Path doneFile = new Path(curCompactStore, COMPACTION_DONE);
|
|
|
- (new DataOutputStream(fs.create(doneFile))).close();
|
|
|
+ // Now, write out an HSTORE_LOGINFOFILE for the brand-new TreeMap.
|
|
|
+ // Compute max-sequenceID seen in any of the to-be-compacted TreeMaps.
|
|
|
+ long maxId = getMaxSequenceId(filesToCompact);
|
|
|
+ compactedOutputFile.writeInfo(fs, maxId);
|
|
|
|
|
|
- // Move the compaction into place.
|
|
|
- completeCompaction();
|
|
|
- return true;
|
|
|
- } finally {
|
|
|
- // Clean up the parent -- the region dir in the compactions directory.
|
|
|
- if (this.fs.exists(curCompactStore.getParent())) {
|
|
|
- if (!this.fs.delete(curCompactStore.getParent())) {
|
|
|
- LOG.warn("Delete returned false deleting " +
|
|
|
- curCompactStore.getParent().toString());
|
|
|
- }
|
|
|
+ // Write out a list of data files that we're replacing
|
|
|
+ Path filesToReplace = new Path(curCompactStore, COMPACTION_TO_REPLACE);
|
|
|
+ DataOutputStream out = new DataOutputStream(fs.create(filesToReplace));
|
|
|
+ try {
|
|
|
+ out.writeInt(filesToCompact.size());
|
|
|
+ for (HStoreFile hsf : filesToCompact) {
|
|
|
+ hsf.write(out);
|
|
|
}
|
|
|
+ } finally {
|
|
|
+ out.close();
|
|
|
}
|
|
|
+
|
|
|
+ // Indicate that we're done.
|
|
|
+ Path doneFile = new Path(curCompactStore, COMPACTION_DONE);
|
|
|
+ (new DataOutputStream(fs.create(doneFile))).close();
|
|
|
+
|
|
|
+ // Move the compaction into place.
|
|
|
+ completeCompaction(curCompactStore);
|
|
|
+ return true;
|
|
|
}
|
|
|
}
|
|
|
|
|
|
/*
|
|
|
- * Compact passed <code>toCompactFiles</code> into <code>compactedOut</code>.
|
|
|
- * We create a new set of MapFile.Reader objects so we don't screw up
|
|
|
- * the caching associated with the currently-loaded ones. Our
|
|
|
- * iteration-based access pattern is practically designed to ruin
|
|
|
- * the cache.
|
|
|
- *
|
|
|
- * We work by opening a single MapFile.Reader for each file, and
|
|
|
- * iterating through them in parallel. We always increment the
|
|
|
- * lowest-ranked one. Updates to a single row/column will appear
|
|
|
- * ranked by timestamp. This allows us to throw out deleted values or
|
|
|
- * obsolete versions.
|
|
|
- * @param compactedOut
|
|
|
- * @param toCompactFiles
|
|
|
- * @throws IOException
|
|
|
+ * Compact passed <code>toCompactFiles</code> into <code>compactedOut</code>.
|
|
|
+ * We create a new set of MapFile.Reader objects so we don't screw up the
|
|
|
+ * caching associated with the currently-loaded ones. Our iteration-based
|
|
|
+ * access pattern is practically designed to ruin the cache.
|
|
|
+ *
|
|
|
+ * We work by opening a single MapFile.Reader for each file, and iterating
|
|
|
+ * through them in parallel. We always increment the lowest-ranked one.
|
|
|
+ * Updates to a single row/column will appear ranked by timestamp. This allows
|
|
|
+ * us to throw out deleted values or obsolete versions. @param compactedOut
|
|
|
+ * @param toCompactFiles @throws IOException
|
|
|
*/
|
|
|
private void compactHStoreFiles(final MapFile.Writer compactedOut,
|
|
|
final List<HStoreFile> toCompactFiles) throws IOException {
|
|
@@ -1107,6 +1088,7 @@ class HStore implements HConstants {
|
|
|
// culprit.
|
|
|
LOG.warn("Failed with " + e.toString() + ": " + hsf.toString() +
|
|
|
(hsf.isReference()? " " + hsf.getReference().toString(): ""));
|
|
|
+ closeCompactionReaders(rdrs);
|
|
|
throw e;
|
|
|
}
|
|
|
}
|
|
@@ -1195,13 +1177,17 @@ class HStore implements HConstants {
|
|
|
}
|
|
|
}
|
|
|
} finally {
|
|
|
- for (int i = 0; i < rdrs.length; i++) {
|
|
|
- if (rdrs[i] != null) {
|
|
|
- try {
|
|
|
- rdrs[i].close();
|
|
|
- } catch (IOException e) {
|
|
|
- LOG.warn("Exception closing reader", e);
|
|
|
- }
|
|
|
+ closeCompactionReaders(rdrs);
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ private void closeCompactionReaders(final CompactionReader [] rdrs) {
|
|
|
+ for (int i = 0; i < rdrs.length; i++) {
|
|
|
+ if (rdrs[i] != null) {
|
|
|
+ try {
|
|
|
+ rdrs[i].close();
|
|
|
+ } catch (IOException e) {
|
|
|
+ LOG.warn("Exception closing reader", e);
|
|
|
}
|
|
|
}
|
|
|
}
|
|
@@ -1326,11 +1312,11 @@ class HStore implements HConstants {
|
|
|
* 8) Releasing the write-lock
|
|
|
* 9) Allow new scanners to proceed.
|
|
|
* </pre>
|
|
|
+ *
|
|
|
+ * @param curCompactStore Compaction to complete.
|
|
|
*/
|
|
|
- private void completeCompaction() throws IOException {
|
|
|
- Path curCompactStore = HStoreFile.getHStoreDir(this.compactionDir,
|
|
|
- encodedRegionName, familyName);
|
|
|
-
|
|
|
+ private void completeCompaction(final Path curCompactStore)
|
|
|
+ throws IOException {
|
|
|
// 1. Wait for active scanners to exit
|
|
|
newScannerLock.writeLock().lock(); // prevent new scanners
|
|
|
try {
|
|
@@ -1346,6 +1332,7 @@ class HStore implements HConstants {
|
|
|
// 2. Acquiring the HStore write-lock
|
|
|
this.lock.writeLock().lock();
|
|
|
}
|
|
|
+
|
|
|
try {
|
|
|
Path doneFile = new Path(curCompactStore, COMPACTION_DONE);
|
|
|
if (!fs.exists(doneFile)) {
|
|
@@ -1366,7 +1353,6 @@ class HStore implements HConstants {
|
|
|
hsf.readFields(in);
|
|
|
toCompactFiles.add(hsf);
|
|
|
}
|
|
|
-
|
|
|
} finally {
|
|
|
in.close();
|
|
|
}
|
|
@@ -1412,13 +1398,13 @@ class HStore implements HConstants {
|
|
|
// 7. Loading the new TreeMap.
|
|
|
Long orderVal = Long.valueOf(finalCompactedFile.loadInfo(fs));
|
|
|
this.readers.put(orderVal,
|
|
|
- finalCompactedFile.getReader(this.fs, this.bloomFilter));
|
|
|
+ finalCompactedFile.getReader(this.fs, this.bloomFilter));
|
|
|
this.storefiles.put(orderVal, finalCompactedFile);
|
|
|
} catch (IOException e) {
|
|
|
LOG.error("Failed replacing compacted files. Compacted file is " +
|
|
|
- finalCompactedFile.toString() + ". Files replaced are " +
|
|
|
- toCompactFiles.toString() +
|
|
|
- " some of which may have been already removed", e);
|
|
|
+ finalCompactedFile.toString() + ". Files replaced are " +
|
|
|
+ toCompactFiles.toString() +
|
|
|
+ " some of which may have been already removed", e);
|
|
|
}
|
|
|
} finally {
|
|
|
// 8. Releasing the write-lock
|
|
@@ -1479,6 +1465,17 @@ class HStore implements HConstants {
|
|
|
}
|
|
|
}
|
|
|
|
|
|
+ /*
|
|
|
+ * @return Path to the compaction directory for this column family.
|
|
|
+ * Compaction dir is a subdirectory of the region. Needs to have the
|
|
|
+ * same regiondir/storefamily path prefix; HStoreFile constructor presumes
|
|
|
+ * it (TODO: Fix).
|
|
|
+ */
|
|
|
+ private Path getCompactionDir() {
|
|
|
+ return HStoreFile.getHStoreDir(this.compactionDir,
|
|
|
+ this.encodedRegionName, this.familyName);
|
|
|
+ }
|
|
|
+
|
|
|
private MapFile.Reader [] getReaders() {
|
|
|
return this.readers.values().
|
|
|
toArray(new MapFile.Reader[this.readers.size()]);
|