|
@@ -29,9 +29,6 @@ import org.apache.hadoop.conf.*;
|
|
|
import java.io.*;
|
|
|
import java.util.*;
|
|
|
import java.util.concurrent.atomic.AtomicInteger;
|
|
|
-import java.util.concurrent.atomic.AtomicLong;
|
|
|
-import java.util.concurrent.locks.Lock;
|
|
|
-import java.util.concurrent.locks.ReentrantLock;
|
|
|
|
|
|
/**
|
|
|
* HLog stores all the edits to the HStore.
|
|
@@ -56,11 +53,6 @@ import java.util.concurrent.locks.ReentrantLock;
|
|
|
* older (smaller) than the most-recent CACHEFLUSH message for every HRegion
|
|
|
* that has a message in F.
|
|
|
*
|
|
|
- * <p>synchronized methods can never execute in parallel. However, between the
|
|
|
- * start of a cache flush and the completion point, appends are allowed but log
|
|
|
- * rolling is not. To prevent log rolling taking place during this period, a
|
|
|
- * separate reentrant lock is used.
|
|
|
- *
|
|
|
* <p>TODO: Vuk Ercegovac also pointed out that keeping HBase HRegion edit logs
|
|
|
* in HDFS is currently flawed. HBase writes edits to logs and to a memcache.
|
|
|
* The 'atomic' write to the log is meant to serve as insurance against
|
|
@@ -82,21 +74,20 @@ public class HLog implements HConstants {
|
|
|
|
|
|
SequenceFile.Writer writer;
|
|
|
TreeMap<Long, Path> outputfiles = new TreeMap<Long, Path>();
|
|
|
- HashMap<Text, Long> lastSeqWritten = new HashMap<Text, Long>();
|
|
|
+ volatile boolean insideCacheFlush = false;
|
|
|
+
|
|
|
+ TreeMap<Text, Long> regionToLastFlush = new TreeMap<Text, Long>();
|
|
|
|
|
|
volatile boolean closed = false;
|
|
|
- AtomicLong logSeqNum = new AtomicLong(0);
|
|
|
- volatile long filenum = 0;
|
|
|
+ volatile long logSeqNum = 0;
|
|
|
+ long filenum = 0;
|
|
|
AtomicInteger numEntries = new AtomicInteger(0);
|
|
|
|
|
|
- // This lock prevents starting a log roll during a cache flush.
|
|
|
- // synchronized is insufficient because a cache flush spans two method calls.
|
|
|
- private final Lock cacheFlushLock = new ReentrantLock();
|
|
|
+ Integer rollLock = new Integer(0);
|
|
|
|
|
|
/**
|
|
|
* Split up a bunch of log files, that are no longer being written to,
|
|
|
- * into new files, one per region. Delete the old log files when finished.
|
|
|
- *
|
|
|
+ * into new files, one per region. Delete the old log files when ready.
|
|
|
* @param rootDir Root directory of the HBase instance
|
|
|
* @param srcDir Directory of log files to split:
|
|
|
* e.g. <code>${ROOTDIR}/log_HOST_PORT</code>
|
|
@@ -189,105 +180,109 @@ public class HLog implements HConstants {
|
|
|
fs.mkdirs(dir);
|
|
|
rollWriter();
|
|
|
}
|
|
|
-
|
|
|
- /**
|
|
|
- * Called by HRegionServer when it opens a new region to ensure that log
|
|
|
- * sequence numbers are always greater than the latest sequence number of
|
|
|
- * the region being brought on-line.
|
|
|
- *
|
|
|
- * @param newvalue
|
|
|
- */
|
|
|
+
|
|
|
synchronized void setSequenceNumber(long newvalue) {
|
|
|
- if (newvalue > logSeqNum.get()) {
|
|
|
+ if (newvalue > logSeqNum) {
|
|
|
if (LOG.isDebugEnabled()) {
|
|
|
LOG.debug("changing sequence number from " + logSeqNum + " to " +
|
|
|
newvalue);
|
|
|
}
|
|
|
- logSeqNum.set(newvalue);
|
|
|
+ logSeqNum = newvalue;
|
|
|
}
|
|
|
}
|
|
|
|
|
|
/**
|
|
|
* Roll the log writer. That is, start writing log messages to a new file.
|
|
|
- *
|
|
|
- * Because a log cannot be rolled during a cache flush, and a cache flush
|
|
|
- * spans two method calls, a special lock needs to be obtained so that a
|
|
|
- * cache flush cannot start when the log is being rolled and the log cannot
|
|
|
- * be rolled during a cache flush.
|
|
|
- *
|
|
|
- * Note that this method cannot be synchronized because it is possible that
|
|
|
- * startCacheFlush runs, obtaining the cacheFlushLock, then this method could
|
|
|
- * start which would obtain the lock on this but block on obtaining the
|
|
|
- * cacheFlushLock and then completeCacheFlush could be called which would
|
|
|
- * wait for the lock on this and consequently never release the cacheFlushLock
|
|
|
+ *
|
|
|
+ * The 'rollLock' prevents us from entering rollWriter() more than
|
|
|
+ * once at a time.
|
|
|
+ *
|
|
|
+ * The 'this' lock limits access to the current writer so
|
|
|
+ * we don't append multiple items simultaneously.
|
|
|
*
|
|
|
* @throws IOException
|
|
|
*/
|
|
|
void rollWriter() throws IOException {
|
|
|
- if(closed) {
|
|
|
- throw new IOException("Cannot roll log; log is closed");
|
|
|
- }
|
|
|
+ synchronized(rollLock) {
|
|
|
|
|
|
- cacheFlushLock.lock(); // prevent cache flushes
|
|
|
- try {
|
|
|
- // Now that we have locked out cache flushes, lock this to prevent other
|
|
|
- // changes.
|
|
|
+ // Try to roll the writer to a new file. We may have to
|
|
|
+ // wait for a cache-flush to complete. In the process,
|
|
|
+ // compute a list of old log files that can be deleted.
|
|
|
+
|
|
|
+ Vector<Path> toDeleteList = new Vector<Path>();
|
|
|
+ synchronized(this) {
|
|
|
+ if(closed) {
|
|
|
+ throw new IOException("Cannot roll log; log is closed");
|
|
|
+ }
|
|
|
|
|
|
- synchronized (this) {
|
|
|
- if (writer != null) { // Close the current writer (if any), get a new one.
|
|
|
+ // Make sure we do not roll the log while inside a
|
|
|
+ // cache-flush. Otherwise, the log sequence number for
|
|
|
+ // the CACHEFLUSH operation will appear in a "newer" log file
|
|
|
+ // than it should.
|
|
|
+ while(insideCacheFlush) {
|
|
|
+ try {
|
|
|
+ wait();
|
|
|
+ } catch (InterruptedException ie) {
|
|
|
+ // continue;
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ // Close the current writer (if any), and grab a new one.
|
|
|
+ if(writer != null) {
|
|
|
writer.close();
|
|
|
Path p = computeFilename(filenum - 1);
|
|
|
if(LOG.isDebugEnabled()) {
|
|
|
LOG.debug("Closing current log writer " + p.toString() +
|
|
|
- " to get a new one");
|
|
|
+ " to get a new one");
|
|
|
}
|
|
|
if (filenum > 0) {
|
|
|
- outputfiles.put(logSeqNum.get() - 1, p);
|
|
|
+ outputfiles.put(logSeqNum - 1, p);
|
|
|
}
|
|
|
}
|
|
|
Path newPath = computeFilename(filenum++);
|
|
|
- this.writer = SequenceFile.createWriter(fs, conf, newPath, HLogKey.class,
|
|
|
- HLogEdit.class);
|
|
|
-
|
|
|
- if (LOG.isDebugEnabled()) {
|
|
|
+ this.writer = SequenceFile.createWriter(fs, conf, newPath,
|
|
|
+ HLogKey.class, HLogEdit.class);
|
|
|
+ if(LOG.isDebugEnabled()) {
|
|
|
LOG.debug("new log writer created at " + newPath);
|
|
|
}
|
|
|
-
|
|
|
+
|
|
|
// Can we delete any of the old log files?
|
|
|
// First, compute the oldest relevant log operation
|
|
|
// over all the regions.
|
|
|
|
|
|
long oldestOutstandingSeqNum = Long.MAX_VALUE;
|
|
|
- for (Long l: lastSeqWritten.values()) {
|
|
|
+ for(Long l: regionToLastFlush.values()) {
|
|
|
long curSeqNum = l.longValue();
|
|
|
-
|
|
|
- if (curSeqNum < oldestOutstandingSeqNum) {
|
|
|
+
|
|
|
+ if(curSeqNum < oldestOutstandingSeqNum) {
|
|
|
oldestOutstandingSeqNum = curSeqNum;
|
|
|
}
|
|
|
}
|
|
|
|
|
|
- // Get the set of all sequence numbers that are older than the oldest
|
|
|
- // pending region operation
|
|
|
-
|
|
|
- TreeSet<Long> sequenceNumbers = new TreeSet<Long>();
|
|
|
- sequenceNumbers.addAll(
|
|
|
- outputfiles.headMap(oldestOutstandingSeqNum).keySet());
|
|
|
-
|
|
|
- // Remove all files with a final ID that's older than the oldest
|
|
|
- // pending region-operation.
|
|
|
-
|
|
|
- for (Long seq: sequenceNumbers) {
|
|
|
- Path p = outputfiles.remove(seq);
|
|
|
- if(LOG.isDebugEnabled()) {
|
|
|
- LOG.debug("removing old log file " + p.toString());
|
|
|
+ // Next, remove all files with a final ID that's older
|
|
|
+ // than the oldest pending region-operation.
|
|
|
+ for(Iterator<Long> it = outputfiles.keySet().iterator(); it.hasNext();) {
|
|
|
+ long maxSeqNum = it.next().longValue();
|
|
|
+ if(maxSeqNum < oldestOutstandingSeqNum) {
|
|
|
+ Path p = outputfiles.get(maxSeqNum);
|
|
|
+ it.remove();
|
|
|
+ toDeleteList.add(p);
|
|
|
+
|
|
|
+ } else {
|
|
|
+ break;
|
|
|
}
|
|
|
- fs.delete(p);
|
|
|
}
|
|
|
- this.numEntries.set(0);
|
|
|
}
|
|
|
-
|
|
|
- } finally {
|
|
|
- cacheFlushLock.unlock();
|
|
|
+
|
|
|
+ // Actually delete them, if any!
|
|
|
+ for(Iterator<Path> it = toDeleteList.iterator(); it.hasNext(); ) {
|
|
|
+ Path p = it.next();
|
|
|
+ if(LOG.isDebugEnabled()) {
|
|
|
+ LOG.debug("removing old log file " + p.toString());
|
|
|
+ }
|
|
|
+ fs.delete(p);
|
|
|
+ }
|
|
|
+ this.numEntries.set(0);
|
|
|
}
|
|
|
}
|
|
|
|
|
@@ -333,9 +328,7 @@ public class HLog implements HConstants {
|
|
|
* other systems should process the log appropriately upon each startup
|
|
|
* (and prior to initializing HLog).
|
|
|
*
|
|
|
- * synchronized prevents appends during the completion of a cache flush or
|
|
|
- * for the duration of a log roll.
|
|
|
- *
|
|
|
+ * We need to seize a lock on the writer so that writes are atomic.
|
|
|
* @param regionName
|
|
|
* @param tableName
|
|
|
* @param row
|
|
@@ -344,19 +337,21 @@ public class HLog implements HConstants {
|
|
|
* @throws IOException
|
|
|
*/
|
|
|
synchronized void append(Text regionName, Text tableName, Text row,
|
|
|
- TreeMap<Text, byte []> columns, long timestamp) throws IOException {
|
|
|
+ TreeMap<Text, byte []> columns, long timestamp)
|
|
|
+ throws IOException {
|
|
|
if(closed) {
|
|
|
throw new IOException("Cannot append; log is closed");
|
|
|
}
|
|
|
-
|
|
|
- long seqNum[] = obtainSeqNum(columns.size());
|
|
|
|
|
|
- // The 'lastSeqWritten' map holds the sequence number of the most recent
|
|
|
- // write for each region. When the cache is flushed, the entry for the
|
|
|
- // region being flushed is removed if the sequence number of the flush
|
|
|
- // is greater than or equal to the value in lastSeqWritten
|
|
|
-
|
|
|
- lastSeqWritten.put(regionName, seqNum[seqNum.length - 1]);
|
|
|
+ long seqNum[] = obtainSeqNum(columns.size());
|
|
|
+
|
|
|
+ // The 'regionToLastFlush' map holds the sequence id of the
|
|
|
+ // most recent flush for every regionName. However, for regions
|
|
|
+ // that don't have any flush yet, the relevant operation is the
|
|
|
+ // first one that's been added.
|
|
|
+ if (regionToLastFlush.get(regionName) == null) {
|
|
|
+ regionToLastFlush.put(regionName, seqNum[0]);
|
|
|
+ }
|
|
|
|
|
|
int counter = 0;
|
|
|
for (Map.Entry<Text, byte []> es: columns.entrySet()) {
|
|
@@ -368,39 +363,29 @@ public class HLog implements HConstants {
|
|
|
}
|
|
|
}
|
|
|
|
|
|
- /**
|
|
|
- * @return How many items have been added to the log
|
|
|
- *
|
|
|
- * Because numEntries is an AtomicInteger, no locking is required.
|
|
|
- */
|
|
|
+ /** @return How many items have been added to the log */
|
|
|
int getNumEntries() {
|
|
|
return numEntries.get();
|
|
|
}
|
|
|
|
|
|
/**
|
|
|
- * Obtain a log sequence number.
|
|
|
- *
|
|
|
- * Because it is only called from a synchronized method, no additional locking
|
|
|
- * is required.
|
|
|
+ * Obtain a log sequence number. This seizes the whole HLog
|
|
|
+ * lock, but it shouldn't last too long.
|
|
|
*/
|
|
|
- private long obtainSeqNum() {
|
|
|
- return logSeqNum.getAndIncrement();
|
|
|
+ synchronized long obtainSeqNum() {
|
|
|
+ return logSeqNum++;
|
|
|
}
|
|
|
|
|
|
/**
|
|
|
* Obtain a specified number of sequence numbers
|
|
|
*
|
|
|
- * Because it is only called from a synchronized method, no additional locking
|
|
|
- * is required.
|
|
|
- *
|
|
|
* @param num - number of sequence numbers to obtain
|
|
|
* @return - array of sequence numbers
|
|
|
*/
|
|
|
- private long[] obtainSeqNum(int num) {
|
|
|
- long sequenceNumber = logSeqNum.getAndAdd(num);
|
|
|
+ synchronized long[] obtainSeqNum(int num) {
|
|
|
long[] results = new long[num];
|
|
|
for (int i = 0; i < num; i++) {
|
|
|
- results[i] = sequenceNumber++;
|
|
|
+ results[i] = logSeqNum++;
|
|
|
}
|
|
|
return results;
|
|
|
}
|
|
@@ -409,50 +394,54 @@ public class HLog implements HConstants {
|
|
|
* By acquiring a log sequence ID, we can allow log messages
|
|
|
* to continue while we flush the cache.
|
|
|
*
|
|
|
- * Acquire a lock so that we do not roll the log between the start
|
|
|
- * and completion of a cache-flush. Otherwise the log-seq-id for
|
|
|
+ * Set a flag so that we do not roll the log between the start
|
|
|
+ * and complete of a cache-flush. Otherwise the log-seq-id for
|
|
|
* the flush will not appear in the correct logfile.
|
|
|
- *
|
|
|
* @return sequence ID to pass {@link #completeCacheFlush(Text, Text, long)}
|
|
|
* @see #completeCacheFlush(Text, Text, long)
|
|
|
* @see #abortCacheFlush()
|
|
|
*/
|
|
|
synchronized long startCacheFlush() {
|
|
|
- cacheFlushLock.lock();
|
|
|
+ while (this.insideCacheFlush) {
|
|
|
+ try {
|
|
|
+ wait();
|
|
|
+ } catch (InterruptedException ie) {
|
|
|
+ // continue
|
|
|
+ }
|
|
|
+ }
|
|
|
+ this.insideCacheFlush = true;
|
|
|
+ notifyAll();
|
|
|
return obtainSeqNum();
|
|
|
}
|
|
|
|
|
|
- /**
|
|
|
- * Complete the cache flush
|
|
|
- *
|
|
|
- * Protected by this.lock()
|
|
|
- *
|
|
|
+ /** Complete the cache flush
|
|
|
* @param regionName
|
|
|
* @param tableName
|
|
|
* @param logSeqId
|
|
|
* @throws IOException
|
|
|
*/
|
|
|
synchronized void completeCacheFlush(final Text regionName,
|
|
|
- final Text tableName, final long logSeqId) throws IOException {
|
|
|
+ final Text tableName, final long logSeqId)
|
|
|
+ throws IOException {
|
|
|
+ if(this.closed) {
|
|
|
+ return;
|
|
|
+ }
|
|
|
+
|
|
|
+ if (!this.insideCacheFlush) {
|
|
|
+ throw new IOException("Impossible situation: inside " +
|
|
|
+ "completeCacheFlush(), but 'insideCacheFlush' flag is false");
|
|
|
+ }
|
|
|
+ HLogKey key = new HLogKey(regionName, tableName, HLog.METAROW, logSeqId);
|
|
|
+ this.writer.append(key,
|
|
|
+ new HLogEdit(HLog.METACOLUMN, HGlobals.completeCacheFlush.get(),
|
|
|
+ System.currentTimeMillis()));
|
|
|
+ this.numEntries.getAndIncrement();
|
|
|
|
|
|
- try {
|
|
|
- if(this.closed) {
|
|
|
- return;
|
|
|
- }
|
|
|
-
|
|
|
- writer.append(new HLogKey(regionName, tableName, HLog.METAROW, logSeqId),
|
|
|
- new HLogEdit(HLog.METACOLUMN, HGlobals.completeCacheFlush.get(),
|
|
|
- System.currentTimeMillis()));
|
|
|
-
|
|
|
- numEntries.getAndIncrement();
|
|
|
- Long seq = lastSeqWritten.get(regionName);
|
|
|
- if (seq != null && logSeqId >= seq) {
|
|
|
- lastSeqWritten.remove(regionName);
|
|
|
- }
|
|
|
+ // Remember the most-recent flush for each region.
|
|
|
+ // This is used to delete obsolete log files.
|
|
|
+ this.regionToLastFlush.put(regionName, Long.valueOf(logSeqId));
|
|
|
|
|
|
- } finally {
|
|
|
- cacheFlushLock.unlock();
|
|
|
- }
|
|
|
+ cleanup();
|
|
|
}
|
|
|
|
|
|
/**
|
|
@@ -462,8 +451,23 @@ public class HLog implements HConstants {
|
|
|
* is a restart of the regionserver so the snapshot content dropped by the
|
|
|
* failure gets restored to the memcache.
|
|
|
*/
|
|
|
- void abortCacheFlush() {
|
|
|
- this.cacheFlushLock.unlock();
|
|
|
+ synchronized void abortCacheFlush() {
|
|
|
+ cleanup();
|
|
|
+ }
|
|
|
+
|
|
|
+ private synchronized void cleanup() {
|
|
|
+ this.insideCacheFlush = false;
|
|
|
+ notifyAll();
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * Abort a cache flush.
|
|
|
+ * This method will clear waits on {@link #insideCacheFlush} but if this
|
|
|
+ * method is called, we are losing data. TODO: Fix.
|
|
|
+ */
|
|
|
+ synchronized void abort() {
|
|
|
+ this.insideCacheFlush = false;
|
|
|
+ notifyAll();
|
|
|
}
|
|
|
|
|
|
private static void usage() {
|