|
@@ -19,91 +19,124 @@
|
|
|
*/
|
|
|
package org.apache.hadoop.hbase;
|
|
|
|
|
|
+import java.io.File;
|
|
|
+import java.io.FileNotFoundException;
|
|
|
+import java.io.IOException;
|
|
|
+import java.util.HashMap;
|
|
|
+import java.util.Map;
|
|
|
+import java.util.TreeMap;
|
|
|
+import java.util.TreeSet;
|
|
|
+import java.util.concurrent.locks.Lock;
|
|
|
+import java.util.concurrent.locks.ReentrantLock;
|
|
|
+
|
|
|
import org.apache.commons.logging.Log;
|
|
|
import org.apache.commons.logging.LogFactory;
|
|
|
-import org.apache.hadoop.io.*;
|
|
|
+import org.apache.hadoop.conf.Configuration;
|
|
|
+import org.apache.hadoop.fs.FileSystem;
|
|
|
+import org.apache.hadoop.fs.FileUtil;
|
|
|
+import org.apache.hadoop.fs.Path;
|
|
|
+import org.apache.hadoop.io.SequenceFile;
|
|
|
+import org.apache.hadoop.io.Text;
|
|
|
import org.apache.hadoop.io.SequenceFile.Reader;
|
|
|
-import org.apache.hadoop.fs.*;
|
|
|
-import org.apache.hadoop.conf.*;
|
|
|
-
|
|
|
-import java.io.*;
|
|
|
-import java.util.*;
|
|
|
-import java.util.concurrent.atomic.AtomicInteger;
|
|
|
|
|
|
/**
|
|
|
* HLog stores all the edits to the HStore.
|
|
|
- *
|
|
|
- * It performs logfile-rolling, so external callers are not aware that the
|
|
|
+ *
|
|
|
+ * It performs logfile-rolling, so external callers are not aware that the
|
|
|
* underlying file is being rolled.
|
|
|
*
|
|
|
- * <p>A single HLog is used by several HRegions simultaneously.
|
|
|
- *
|
|
|
- * <p>Each HRegion is identified by a unique long <code>int</code>. HRegions do
|
|
|
+ * <p>
|
|
|
+ * A single HLog is used by several HRegions simultaneously.
|
|
|
+ *
|
|
|
+ * <p>
|
|
|
+ * Each HRegion is identified by a unique long <code>int</code>. HRegions do
|
|
|
* not need to declare themselves before using the HLog; they simply include
|
|
|
- * their HRegion-id in the <code>append</code> or
|
|
|
+ * their HRegion-id in the <code>append</code> or
|
|
|
* <code>completeCacheFlush</code> calls.
|
|
|
*
|
|
|
- * <p>An HLog consists of multiple on-disk files, which have a chronological
|
|
|
- * order. As data is flushed to other (better) on-disk structures, the log
|
|
|
- * becomes obsolete. We can destroy all the log messages for a given
|
|
|
- * HRegion-id up to the most-recent CACHEFLUSH message from that HRegion.
|
|
|
+ * <p>
|
|
|
+ * An HLog consists of multiple on-disk files, which have a chronological order.
|
|
|
+ * As data is flushed to other (better) on-disk structures, the log becomes
|
|
|
+ * obsolete. We can destroy all the log messages for a given HRegion-id up to
|
|
|
+ * the most-recent CACHEFLUSH message from that HRegion.
|
|
|
+ *
|
|
|
+ * <p>
|
|
|
+ * It's only practical to delete entire files. Thus, we delete an entire on-disk
|
|
|
+ * file F when all of the messages in F have a log-sequence-id that's older
|
|
|
+ * (smaller) than the most-recent CACHEFLUSH message for every HRegion that has
|
|
|
+ * a message in F.
|
|
|
+ *
|
|
|
+ * <p>
|
|
|
+ * Synchronized methods can never execute in parallel. However, between the
|
|
|
+ * start of a cache flush and the completion point, appends are allowed but log
|
|
|
+ * rolling is not. To prevent log rolling taking place during this period, a
|
|
|
+ * separate reentrant lock is used.
|
|
|
*
|
|
|
- * <p>It's only practical to delete entire files. Thus, we delete an entire
|
|
|
- * on-disk file F when all of the messages in F have a log-sequence-id that's
|
|
|
- * older (smaller) than the most-recent CACHEFLUSH message for every HRegion
|
|
|
- * that has a message in F.
|
|
|
- *
|
|
|
- * <p>TODO: Vuk Ercegovac also pointed out that keeping HBase HRegion edit logs
|
|
|
- * in HDFS is currently flawed. HBase writes edits to logs and to a memcache.
|
|
|
- * The 'atomic' write to the log is meant to serve as insurance against
|
|
|
- * abnormal RegionServer exit: on startup, the log is rerun to reconstruct an
|
|
|
- * HRegion's last wholesome state. But files in HDFS do not 'exist' until they
|
|
|
- * are cleanly closed -- something that will not happen if RegionServer exits
|
|
|
- * without running its 'close'.
|
|
|
+ * <p>
|
|
|
+ * TODO: Vuk Ercegovac also pointed out that keeping HBase HRegion edit logs in
|
|
|
+ * HDFS is currently flawed. HBase writes edits to logs and to a memcache. The
|
|
|
+ * 'atomic' write to the log is meant to serve as insurance against abnormal
|
|
|
+ * RegionServer exit: on startup, the log is rerun to reconstruct an HRegion's
|
|
|
+ * last wholesome state. But files in HDFS do not 'exist' until they are cleanly
|
|
|
+ * closed -- something that will not happen if RegionServer exits without
|
|
|
+ * running its 'close'.
|
|
|
*/
|
|
|
public class HLog implements HConstants {
|
|
|
private static final Log LOG = LogFactory.getLog(HLog.class);
|
|
|
-
|
|
|
+
|
|
|
static final String HLOG_DATFILE = "hlog.dat.";
|
|
|
+
|
|
|
static final Text METACOLUMN = new Text("METACOLUMN:");
|
|
|
+
|
|
|
static final Text METAROW = new Text("METAROW");
|
|
|
|
|
|
FileSystem fs;
|
|
|
+
|
|
|
Path dir;
|
|
|
+
|
|
|
Configuration conf;
|
|
|
|
|
|
+ final long threadWakeFrequency;
|
|
|
+
|
|
|
SequenceFile.Writer writer;
|
|
|
+
|
|
|
TreeMap<Long, Path> outputfiles = new TreeMap<Long, Path>();
|
|
|
- volatile boolean insideCacheFlush = false;
|
|
|
|
|
|
- TreeMap<Text, Long> regionToLastFlush = new TreeMap<Text, Long>();
|
|
|
+ HashMap<Text, Long> lastSeqWritten = new HashMap<Text, Long>();
|
|
|
|
|
|
volatile boolean closed = false;
|
|
|
+
|
|
|
+ private final Integer sequenceLock = new Integer(0);
|
|
|
volatile long logSeqNum = 0;
|
|
|
- long filenum = 0;
|
|
|
- AtomicInteger numEntries = new AtomicInteger(0);
|
|
|
|
|
|
- Integer rollLock = new Integer(0);
|
|
|
+ volatile long filenum = 0;
|
|
|
+
|
|
|
+ volatile int numEntries = 0;
|
|
|
+
|
|
|
+ // This lock prevents starting a log roll during a cache flush.
|
|
|
+ // synchronized is insufficient because a cache flush spans two method calls.
|
|
|
+ private final Lock cacheFlushLock = new ReentrantLock();
|
|
|
|
|
|
/**
|
|
|
- * Split up a bunch of log files, that are no longer being written to,
|
|
|
- * into new files, one per region. Delete the old log files when ready.
|
|
|
+ * Split up a bunch of log files, that are no longer being written to, into
|
|
|
+ * new files, one per region. Delete the old log files when finished.
|
|
|
+ *
|
|
|
* @param rootDir Root directory of the HBase instance
|
|
|
- * @param srcDir Directory of log files to split:
|
|
|
- * e.g. <code>${ROOTDIR}/log_HOST_PORT</code>
|
|
|
+ * @param srcDir Directory of log files to split: e.g.
|
|
|
+ * <code>${ROOTDIR}/log_HOST_PORT</code>
|
|
|
* @param fs FileSystem
|
|
|
* @param conf HBaseConfiguration
|
|
|
* @throws IOException
|
|
|
*/
|
|
|
static void splitLog(Path rootDir, Path srcDir, FileSystem fs,
|
|
|
- Configuration conf) throws IOException {
|
|
|
- Path logfiles[] = fs.listPaths(new Path[] {srcDir});
|
|
|
+ Configuration conf) throws IOException {
|
|
|
+ Path logfiles[] = fs.listPaths(new Path[] { srcDir });
|
|
|
LOG.info("splitting " + logfiles.length + " log(s) in " +
|
|
|
srcDir.toString());
|
|
|
HashMap<Text, SequenceFile.Writer> logWriters =
|
|
|
new HashMap<Text, SequenceFile.Writer>();
|
|
|
try {
|
|
|
- for(int i = 0; i < logfiles.length; i++) {
|
|
|
+ for (int i = 0; i < logfiles.length; i++) {
|
|
|
if (LOG.isDebugEnabled()) {
|
|
|
LOG.debug("Splitting " + logfiles[i]);
|
|
|
}
|
|
@@ -118,7 +151,7 @@ public class HLog implements HConstants {
|
|
|
try {
|
|
|
HLogKey key = new HLogKey();
|
|
|
HLogEdit val = new HLogEdit();
|
|
|
- while(in.next(key, val)) {
|
|
|
+ while (in.next(key, val)) {
|
|
|
Text regionName = key.getRegionName();
|
|
|
SequenceFile.Writer w = logWriters.get(regionName);
|
|
|
if (w == null) {
|
|
@@ -141,15 +174,15 @@ public class HLog implements HConstants {
|
|
|
}
|
|
|
}
|
|
|
} finally {
|
|
|
- for (SequenceFile.Writer w: logWriters.values()) {
|
|
|
+ for (SequenceFile.Writer w : logWriters.values()) {
|
|
|
w.close();
|
|
|
}
|
|
|
}
|
|
|
-
|
|
|
- if(fs.exists(srcDir)) {
|
|
|
- if(! fs.delete(srcDir)) {
|
|
|
+
|
|
|
+ if (fs.exists(srcDir)) {
|
|
|
+ if (!fs.delete(srcDir)) {
|
|
|
LOG.error("Cannot delete: " + srcDir);
|
|
|
- if(! FileUtil.fullyDelete(new File(srcDir.toString()))) {
|
|
|
+ if (!FileUtil.fullyDelete(new File(srcDir.toString()))) {
|
|
|
throw new IOException("Cannot delete: " + srcDir);
|
|
|
}
|
|
|
}
|
|
@@ -160,10 +193,10 @@ public class HLog implements HConstants {
|
|
|
/**
|
|
|
* Create an edit log at the given <code>dir</code> location.
|
|
|
*
|
|
|
- * You should never have to load an existing log. If there is a log
|
|
|
- * at startup, it should have already been processed and deleted by
|
|
|
- * the time the HLog object is started up.
|
|
|
- *
|
|
|
+ * You should never have to load an existing log. If there is a log at
|
|
|
+ * startup, it should have already been processed and deleted by the time the
|
|
|
+ * HLog object is started up.
|
|
|
+ *
|
|
|
* @param fs
|
|
|
* @param dir
|
|
|
* @param conf
|
|
@@ -173,6 +206,7 @@ public class HLog implements HConstants {
|
|
|
this.fs = fs;
|
|
|
this.dir = dir;
|
|
|
this.conf = conf;
|
|
|
+ this.threadWakeFrequency = conf.getLong(THREAD_WAKE_FREQUENCY, 10 * 1000);
|
|
|
|
|
|
if (fs.exists(dir)) {
|
|
|
throw new IOException("Target HLog directory already exists: " + dir);
|
|
@@ -180,115 +214,117 @@ public class HLog implements HConstants {
|
|
|
fs.mkdirs(dir);
|
|
|
rollWriter();
|
|
|
}
|
|
|
-
|
|
|
- synchronized void setSequenceNumber(long newvalue) {
|
|
|
- if (newvalue > logSeqNum) {
|
|
|
- if (LOG.isDebugEnabled()) {
|
|
|
- LOG.debug("changing sequence number from " + logSeqNum + " to " +
|
|
|
- newvalue);
|
|
|
+
|
|
|
+ /**
|
|
|
+ * Called by HRegionServer when it opens a new region to ensure that log
|
|
|
+ * sequence numbers are always greater than the latest sequence number of the
|
|
|
+ * region being brought on-line.
|
|
|
+ *
|
|
|
+ * @param newvalue
|
|
|
+ */
|
|
|
+ void setSequenceNumber(long newvalue) {
|
|
|
+ synchronized (sequenceLock) {
|
|
|
+ if (newvalue > logSeqNum) {
|
|
|
+ if (LOG.isDebugEnabled()) {
|
|
|
+ LOG.debug("changing sequence number from " + logSeqNum + " to " +
|
|
|
+ newvalue);
|
|
|
+ }
|
|
|
+ logSeqNum = newvalue;
|
|
|
}
|
|
|
- logSeqNum = newvalue;
|
|
|
}
|
|
|
}
|
|
|
|
|
|
/**
|
|
|
- * Roll the log writer. That is, start writing log messages to a new file.
|
|
|
+ * Roll the log writer. That is, start writing log messages to a new file.
|
|
|
+ *
|
|
|
+ * Because a log cannot be rolled during a cache flush, and a cache flush
|
|
|
+ * spans two method calls, a special lock needs to be obtained so that a cache
|
|
|
+ * flush cannot start when the log is being rolled and the log cannot be
|
|
|
+ * rolled during a cache flush.
|
|
|
*
|
|
|
- * The 'rollLock' prevents us from entering rollWriter() more than
|
|
|
- * once at a time.
|
|
|
+ * Note that this method cannot be synchronized because it is possible that
|
|
|
+ * startCacheFlush runs, obtaining the cacheFlushLock, then this method could
|
|
|
+ * start which would obtain the lock on this but block on obtaining the
|
|
|
+ * cacheFlushLock and then completeCacheFlush could be called which would wait
|
|
|
+ * for the lock on this and consequently never release the cacheFlushLock
|
|
|
*
|
|
|
- * The 'this' lock limits access to the current writer so
|
|
|
- * we don't append multiple items simultaneously.
|
|
|
- *
|
|
|
* @throws IOException
|
|
|
*/
|
|
|
- void rollWriter() throws IOException {
|
|
|
- synchronized(rollLock) {
|
|
|
-
|
|
|
- // Try to roll the writer to a new file. We may have to
|
|
|
- // wait for a cache-flush to complete. In the process,
|
|
|
- // compute a list of old log files that can be deleted.
|
|
|
+ synchronized void rollWriter() throws IOException {
|
|
|
+ boolean locked = false;
|
|
|
+ while (!locked && !closed) {
|
|
|
+ if (cacheFlushLock.tryLock()) {
|
|
|
+ locked = true;
|
|
|
+ break;
|
|
|
+ }
|
|
|
+ try {
|
|
|
+ this.wait(threadWakeFrequency);
|
|
|
+ } catch (InterruptedException e) {
|
|
|
+ }
|
|
|
+ }
|
|
|
+ if (closed) {
|
|
|
+ if (locked) {
|
|
|
+ cacheFlushLock.unlock();
|
|
|
+ }
|
|
|
+ throw new IOException("Cannot roll log; log is closed");
|
|
|
+ }
|
|
|
|
|
|
- Vector<Path> toDeleteList = new Vector<Path>();
|
|
|
- synchronized(this) {
|
|
|
- if(closed) {
|
|
|
- throw new IOException("Cannot roll log; log is closed");
|
|
|
- }
|
|
|
+ // If we get here we have locked out both cache flushes and appends
|
|
|
|
|
|
- // Make sure we do not roll the log while inside a
|
|
|
- // cache-flush. Otherwise, the log sequence number for
|
|
|
- // the CACHEFLUSH operation will appear in a "newer" log file
|
|
|
- // than it should.
|
|
|
- while(insideCacheFlush) {
|
|
|
- try {
|
|
|
- wait();
|
|
|
- } catch (InterruptedException ie) {
|
|
|
- // continue;
|
|
|
- }
|
|
|
- }
|
|
|
-
|
|
|
- // Close the current writer (if any), and grab a new one.
|
|
|
- if(writer != null) {
|
|
|
- writer.close();
|
|
|
- Path p = computeFilename(filenum - 1);
|
|
|
- if(LOG.isDebugEnabled()) {
|
|
|
- LOG.debug("Closing current log writer " + p.toString() +
|
|
|
+ try {
|
|
|
+ if (writer != null) {
|
|
|
+ // Close the current writer, get a new one.
|
|
|
+ writer.close();
|
|
|
+ Path p = computeFilename(filenum - 1);
|
|
|
+ if (LOG.isDebugEnabled()) {
|
|
|
+ LOG.debug("Closing current log writer " + p.toString() +
|
|
|
" to get a new one");
|
|
|
- }
|
|
|
- if (filenum > 0) {
|
|
|
+ }
|
|
|
+ if (filenum > 0) {
|
|
|
+ synchronized (sequenceLock) {
|
|
|
outputfiles.put(logSeqNum - 1, p);
|
|
|
}
|
|
|
}
|
|
|
- Path newPath = computeFilename(filenum++);
|
|
|
- this.writer = SequenceFile.createWriter(fs, conf, newPath,
|
|
|
+ }
|
|
|
+ Path newPath = computeFilename(filenum++);
|
|
|
+ this.writer = SequenceFile.createWriter(fs, conf, newPath,
|
|
|
HLogKey.class, HLogEdit.class);
|
|
|
- if(LOG.isDebugEnabled()) {
|
|
|
- LOG.debug("new log writer created at " + newPath);
|
|
|
- }
|
|
|
-
|
|
|
- // Can we delete any of the old log files?
|
|
|
- // First, compute the oldest relevant log operation
|
|
|
- // over all the regions.
|
|
|
-
|
|
|
- long oldestOutstandingSeqNum = Long.MAX_VALUE;
|
|
|
- for(Long l: regionToLastFlush.values()) {
|
|
|
- long curSeqNum = l.longValue();
|
|
|
-
|
|
|
- if(curSeqNum < oldestOutstandingSeqNum) {
|
|
|
- oldestOutstandingSeqNum = curSeqNum;
|
|
|
- }
|
|
|
- }
|
|
|
|
|
|
- // Next, remove all files with a final ID that's older
|
|
|
- // than the oldest pending region-operation.
|
|
|
- for(Iterator<Long> it = outputfiles.keySet().iterator(); it.hasNext();) {
|
|
|
- long maxSeqNum = it.next().longValue();
|
|
|
- if(maxSeqNum < oldestOutstandingSeqNum) {
|
|
|
- Path p = outputfiles.get(maxSeqNum);
|
|
|
- it.remove();
|
|
|
- toDeleteList.add(p);
|
|
|
-
|
|
|
- } else {
|
|
|
- break;
|
|
|
- }
|
|
|
- }
|
|
|
- }
|
|
|
+ LOG.info("new log writer created at " + newPath);
|
|
|
+
|
|
|
+ // Can we delete any of the old log files?
|
|
|
+
|
|
|
+ TreeSet<Long> sequenceNumbers =
|
|
|
+ new TreeSet<Long>(lastSeqWritten.values());
|
|
|
+
|
|
|
+ if (sequenceNumbers.size() > 0) {
|
|
|
+ long oldestOutstandingSeqNum = sequenceNumbers.first();
|
|
|
+
|
|
|
+ // Get the set of all log files whose final ID is older than the oldest
|
|
|
+ // pending region operation
|
|
|
+
|
|
|
+ sequenceNumbers.clear();
|
|
|
+ sequenceNumbers.addAll(outputfiles.headMap(
|
|
|
+ oldestOutstandingSeqNum).keySet());
|
|
|
|
|
|
- // Actually delete them, if any!
|
|
|
- for(Iterator<Path> it = toDeleteList.iterator(); it.hasNext(); ) {
|
|
|
- Path p = it.next();
|
|
|
- if(LOG.isDebugEnabled()) {
|
|
|
- LOG.debug("removing old log file " + p.toString());
|
|
|
+ // Now remove old log files (if any)
|
|
|
+
|
|
|
+ for (Long seq : sequenceNumbers) {
|
|
|
+ Path p = outputfiles.remove(seq);
|
|
|
+ LOG.info("removing old log file " + p.toString());
|
|
|
+ fs.delete(p);
|
|
|
}
|
|
|
- fs.delete(p);
|
|
|
}
|
|
|
- this.numEntries.set(0);
|
|
|
+ this.numEntries = 0;
|
|
|
+
|
|
|
+ } finally {
|
|
|
+ cacheFlushLock.unlock();
|
|
|
}
|
|
|
}
|
|
|
|
|
|
/**
|
|
|
- * This is a convenience method that computes a new filename with
|
|
|
- * a given file-number.
|
|
|
+ * This is a convenience method that computes a new filename with a given
|
|
|
+ * file-number.
|
|
|
*/
|
|
|
Path computeFilename(final long fn) {
|
|
|
return new Path(dir, HLOG_DATFILE + String.format("%1$03d", fn));
|
|
@@ -296,19 +332,21 @@ public class HLog implements HConstants {
|
|
|
|
|
|
/**
|
|
|
* Shut down the log and delete the log directory
|
|
|
+ *
|
|
|
* @throws IOException
|
|
|
*/
|
|
|
synchronized void closeAndDelete() throws IOException {
|
|
|
close();
|
|
|
fs.delete(dir);
|
|
|
}
|
|
|
-
|
|
|
+
|
|
|
/**
|
|
|
* Shut down the log.
|
|
|
+ *
|
|
|
* @throws IOException
|
|
|
*/
|
|
|
synchronized void close() throws IOException {
|
|
|
- if(LOG.isDebugEnabled()) {
|
|
|
+ if (LOG.isDebugEnabled()) {
|
|
|
LOG.debug("closing log writer in " + this.dir.toString());
|
|
|
}
|
|
|
this.writer.close();
|
|
@@ -319,16 +357,19 @@ public class HLog implements HConstants {
|
|
|
* Append a set of edits to the log. Log edits are keyed by regionName,
|
|
|
* rowname, and log-sequence-id.
|
|
|
*
|
|
|
- * Later, if we sort by these keys, we obtain all the relevant edits for
|
|
|
- * a given key-range of the HRegion (TODO). Any edits that do not have a
|
|
|
+ * Later, if we sort by these keys, we obtain all the relevant edits for a
|
|
|
+ * given key-range of the HRegion (TODO). Any edits that do not have a
|
|
|
* matching {@link HConstants#COMPLETE_CACHEFLUSH} message can be discarded.
|
|
|
*
|
|
|
- * <p>Logs cannot be restarted once closed, or once the HLog process dies.
|
|
|
- * Each time the HLog starts, it must create a new log. This means that
|
|
|
- * other systems should process the log appropriately upon each startup
|
|
|
- * (and prior to initializing HLog).
|
|
|
+ * <p>
|
|
|
+ * Logs cannot be restarted once closed, or once the HLog process dies. Each
|
|
|
+ * time the HLog starts, it must create a new log. This means that other
|
|
|
+ * systems should process the log appropriately upon each startup (and prior
|
|
|
+ * to initializing HLog).
|
|
|
+ *
|
|
|
+ * synchronized prevents appends during the completion of a cache flush or for
|
|
|
+ * the duration of a log roll.
|
|
|
*
|
|
|
- * We need to seize a lock on the writer so that writes are atomic.
|
|
|
* @param regionName
|
|
|
* @param tableName
|
|
|
* @param row
|
|
@@ -337,136 +378,121 @@ public class HLog implements HConstants {
|
|
|
* @throws IOException
|
|
|
*/
|
|
|
synchronized void append(Text regionName, Text tableName, Text row,
|
|
|
- TreeMap<Text, byte []> columns, long timestamp)
|
|
|
- throws IOException {
|
|
|
- if(closed) {
|
|
|
+ TreeMap<Text, byte[]> columns, long timestamp) throws IOException {
|
|
|
+ if (closed) {
|
|
|
throw new IOException("Cannot append; log is closed");
|
|
|
}
|
|
|
-
|
|
|
+
|
|
|
long seqNum[] = obtainSeqNum(columns.size());
|
|
|
|
|
|
- // The 'regionToLastFlush' map holds the sequence id of the
|
|
|
- // most recent flush for every regionName. However, for regions
|
|
|
- // that don't have any flush yet, the relevant operation is the
|
|
|
- // first one that's been added.
|
|
|
- if (regionToLastFlush.get(regionName) == null) {
|
|
|
- regionToLastFlush.put(regionName, seqNum[0]);
|
|
|
- }
|
|
|
+ // The 'lastSeqWritten' map holds the sequence number of the most recent
|
|
|
+ // write for each region. When the cache is flushed, the entry for the
|
|
|
+ // region being flushed is removed if the sequence number of the flush
|
|
|
+ // is greater than or equal to the value in lastSeqWritten
|
|
|
+
|
|
|
+ lastSeqWritten.put(regionName, seqNum[seqNum.length - 1]);
|
|
|
|
|
|
int counter = 0;
|
|
|
- for (Map.Entry<Text, byte []> es: columns.entrySet()) {
|
|
|
+ for (Map.Entry<Text, byte[]> es : columns.entrySet()) {
|
|
|
HLogKey logKey =
|
|
|
new HLogKey(regionName, tableName, row, seqNum[counter++]);
|
|
|
HLogEdit logEdit = new HLogEdit(es.getKey(), es.getValue(), timestamp);
|
|
|
writer.append(logKey, logEdit);
|
|
|
- numEntries.getAndIncrement();
|
|
|
+ numEntries++;
|
|
|
}
|
|
|
}
|
|
|
|
|
|
/** @return How many items have been added to the log */
|
|
|
int getNumEntries() {
|
|
|
- return numEntries.get();
|
|
|
+ return numEntries;
|
|
|
}
|
|
|
|
|
|
/**
|
|
|
- * Obtain a log sequence number. This seizes the whole HLog
|
|
|
- * lock, but it shouldn't last too long.
|
|
|
+ * Obtain a log sequence number.
|
|
|
*/
|
|
|
- synchronized long obtainSeqNum() {
|
|
|
- return logSeqNum++;
|
|
|
+ private long obtainSeqNum() {
|
|
|
+ long value;
|
|
|
+ synchronized (sequenceLock) {
|
|
|
+ value = logSeqNum++;
|
|
|
+ }
|
|
|
+ return value;
|
|
|
}
|
|
|
-
|
|
|
+
|
|
|
/**
|
|
|
* Obtain a specified number of sequence numbers
|
|
|
- *
|
|
|
- * @param num - number of sequence numbers to obtain
|
|
|
- * @return - array of sequence numbers
|
|
|
+ *
|
|
|
+ * @param num number of sequence numbers to obtain
|
|
|
+ * @return array of sequence numbers
|
|
|
*/
|
|
|
- synchronized long[] obtainSeqNum(int num) {
|
|
|
+ private long[] obtainSeqNum(int num) {
|
|
|
long[] results = new long[num];
|
|
|
- for (int i = 0; i < num; i++) {
|
|
|
- results[i] = logSeqNum++;
|
|
|
+ synchronized (sequenceLock) {
|
|
|
+ for (int i = 0; i < num; i++) {
|
|
|
+ results[i] = logSeqNum++;
|
|
|
+ }
|
|
|
}
|
|
|
return results;
|
|
|
}
|
|
|
|
|
|
/**
|
|
|
- * By acquiring a log sequence ID, we can allow log messages
|
|
|
- * to continue while we flush the cache.
|
|
|
+ * By acquiring a log sequence ID, we can allow log messages to continue while
|
|
|
+ * we flush the cache.
|
|
|
+ *
|
|
|
+ * Acquire a lock so that we do not roll the log between the start and
|
|
|
+ * completion of a cache-flush. Otherwise the log-seq-id for the flush will
|
|
|
+ * not appear in the correct logfile.
|
|
|
*
|
|
|
- * Set a flag so that we do not roll the log between the start
|
|
|
- * and complete of a cache-flush. Otherwise the log-seq-id for
|
|
|
- * the flush will not appear in the correct logfile.
|
|
|
* @return sequence ID to pass {@link #completeCacheFlush(Text, Text, long)}
|
|
|
* @see #completeCacheFlush(Text, Text, long)
|
|
|
* @see #abortCacheFlush()
|
|
|
*/
|
|
|
- synchronized long startCacheFlush() {
|
|
|
- while (this.insideCacheFlush) {
|
|
|
- try {
|
|
|
- wait();
|
|
|
- } catch (InterruptedException ie) {
|
|
|
- // continue
|
|
|
- }
|
|
|
- }
|
|
|
- this.insideCacheFlush = true;
|
|
|
- notifyAll();
|
|
|
+ long startCacheFlush() {
|
|
|
+ cacheFlushLock.lock();
|
|
|
return obtainSeqNum();
|
|
|
}
|
|
|
|
|
|
- /** Complete the cache flush
|
|
|
+ /**
|
|
|
+ * Complete the cache flush
|
|
|
+ *
|
|
|
+ * Protected by this and cacheFlushLock
|
|
|
+ *
|
|
|
* @param regionName
|
|
|
* @param tableName
|
|
|
* @param logSeqId
|
|
|
* @throws IOException
|
|
|
*/
|
|
|
synchronized void completeCacheFlush(final Text regionName,
|
|
|
- final Text tableName, final long logSeqId)
|
|
|
- throws IOException {
|
|
|
- if(this.closed) {
|
|
|
- return;
|
|
|
- }
|
|
|
-
|
|
|
- if (!this.insideCacheFlush) {
|
|
|
- throw new IOException("Impossible situation: inside " +
|
|
|
- "completeCacheFlush(), but 'insideCacheFlush' flag is false");
|
|
|
- }
|
|
|
- HLogKey key = new HLogKey(regionName, tableName, HLog.METAROW, logSeqId);
|
|
|
- this.writer.append(key,
|
|
|
- new HLogEdit(HLog.METACOLUMN, HGlobals.completeCacheFlush.get(),
|
|
|
- System.currentTimeMillis()));
|
|
|
- this.numEntries.getAndIncrement();
|
|
|
+ final Text tableName, final long logSeqId) throws IOException {
|
|
|
|
|
|
- // Remember the most-recent flush for each region.
|
|
|
- // This is used to delete obsolete log files.
|
|
|
- this.regionToLastFlush.put(regionName, Long.valueOf(logSeqId));
|
|
|
+ try {
|
|
|
+ if (this.closed) {
|
|
|
+ return;
|
|
|
+ }
|
|
|
+
|
|
|
+ writer.append(new HLogKey(regionName, tableName, HLog.METAROW, logSeqId),
|
|
|
+ new HLogEdit(HLog.METACOLUMN, HGlobals.completeCacheFlush.get(),
|
|
|
+ System.currentTimeMillis()));
|
|
|
|
|
|
- cleanup();
|
|
|
+ numEntries++;
|
|
|
+ Long seq = lastSeqWritten.get(regionName);
|
|
|
+ if (seq != null && logSeqId >= seq) {
|
|
|
+ lastSeqWritten.remove(regionName);
|
|
|
+ }
|
|
|
+
|
|
|
+ } finally {
|
|
|
+ cacheFlushLock.unlock();
|
|
|
+ notifyAll(); // wake up the log roller if it is waiting
|
|
|
+ }
|
|
|
}
|
|
|
-
|
|
|
+
|
|
|
/**
|
|
|
- * Abort a cache flush.
|
|
|
- * This method will clear waits on {@link #insideCacheFlush}. Call if the
|
|
|
- * flush fails. Note that the only recovery for an aborted flush currently
|
|
|
- * is a restart of the regionserver so the snapshot content dropped by the
|
|
|
- * failure gets restored to the memcache.
|
|
|
+ * Abort a cache flush. This method will clear waits on
|
|
|
+ * {@link #insideCacheFlush}. Call if the flush fails. Note that the only
|
|
|
+ * recovery for an aborted flush currently is a restart of the regionserver so
|
|
|
+ * the snapshot content dropped by the failure gets restored to the memcache.
|
|
|
*/
|
|
|
synchronized void abortCacheFlush() {
|
|
|
- cleanup();
|
|
|
- }
|
|
|
-
|
|
|
- private synchronized void cleanup() {
|
|
|
- this.insideCacheFlush = false;
|
|
|
- notifyAll();
|
|
|
- }
|
|
|
-
|
|
|
- /**
|
|
|
- * Abort a cache flush.
|
|
|
- * This method will clear waits on {@link #insideCacheFlush} but if this
|
|
|
- * method is called, we are losing data. TODO: Fix.
|
|
|
- */
|
|
|
- synchronized void abort() {
|
|
|
- this.insideCacheFlush = false;
|
|
|
+ this.cacheFlushLock.unlock();
|
|
|
notifyAll();
|
|
|
}
|
|
|
|
|
@@ -474,10 +500,11 @@ public class HLog implements HConstants {
|
|
|
System.err.println("Usage: java org.apache.hbase.HLog" +
|
|
|
" {--dump <logfile>... | --split <logdir>...}");
|
|
|
}
|
|
|
-
|
|
|
+
|
|
|
/**
|
|
|
* Pass one or more log file names and it will either dump out a text version
|
|
|
* on <code>stdout</code> or split the specified log files.
|
|
|
+ *
|
|
|
* @param args
|
|
|
* @throws IOException
|
|
|
*/
|
|
@@ -490,7 +517,7 @@ public class HLog implements HConstants {
|
|
|
if (args[0].compareTo("--dump") != 0) {
|
|
|
if (args[0].compareTo("--split") == 0) {
|
|
|
dump = false;
|
|
|
-
|
|
|
+
|
|
|
} else {
|
|
|
usage();
|
|
|
System.exit(-1);
|
|
@@ -499,7 +526,7 @@ public class HLog implements HConstants {
|
|
|
Configuration conf = new HBaseConfiguration();
|
|
|
FileSystem fs = FileSystem.get(conf);
|
|
|
Path baseDir = new Path(conf.get(HBASE_DIR, DEFAULT_HBASE_DIR));
|
|
|
-
|
|
|
+
|
|
|
for (int i = 1; i < args.length; i++) {
|
|
|
Path logPath = new Path(args[i]);
|
|
|
if (!fs.exists(logPath)) {
|
|
@@ -513,7 +540,7 @@ public class HLog implements HConstants {
|
|
|
try {
|
|
|
HLogKey key = new HLogKey();
|
|
|
HLogEdit val = new HLogEdit();
|
|
|
- while(log.next(key, val)) {
|
|
|
+ while (log.next(key, val)) {
|
|
|
System.out.println(key.toString() + " " + val.toString());
|
|
|
}
|
|
|
} finally {
|