|
@@ -24,12 +24,12 @@ import java.io.DataOutputStream;
|
|
import java.io.IOException;
|
|
import java.io.IOException;
|
|
import java.io.UnsupportedEncodingException;
|
|
import java.io.UnsupportedEncodingException;
|
|
import java.util.HashMap;
|
|
import java.util.HashMap;
|
|
-import java.util.Iterator;
|
|
|
|
import java.util.Map;
|
|
import java.util.Map;
|
|
import java.util.Random;
|
|
import java.util.Random;
|
|
import java.util.TreeMap;
|
|
import java.util.TreeMap;
|
|
import java.util.TreeSet;
|
|
import java.util.TreeSet;
|
|
import java.util.Vector;
|
|
import java.util.Vector;
|
|
|
|
+import java.util.concurrent.atomic.AtomicBoolean;
|
|
|
|
|
|
import org.apache.commons.logging.Log;
|
|
import org.apache.commons.logging.Log;
|
|
import org.apache.commons.logging.LogFactory;
|
|
import org.apache.commons.logging.LogFactory;
|
|
@@ -39,6 +39,7 @@ import org.apache.hadoop.fs.Path;
|
|
import org.apache.hadoop.hbase.filter.RowFilterInterface;
|
|
import org.apache.hadoop.hbase.filter.RowFilterInterface;
|
|
import org.apache.hadoop.io.DataInputBuffer;
|
|
import org.apache.hadoop.io.DataInputBuffer;
|
|
import org.apache.hadoop.io.Text;
|
|
import org.apache.hadoop.io.Text;
|
|
|
|
+import org.apache.hadoop.util.StringUtils;
|
|
|
|
|
|
/**
|
|
/**
|
|
* HRegion stores data for a certain region of a table. It stores all columns
|
|
* HRegion stores data for a certain region of a table. It stores all columns
|
|
@@ -75,10 +76,10 @@ public class HRegion implements HConstants {
|
|
static String SPLITDIR = "splits";
|
|
static String SPLITDIR = "splits";
|
|
static String MERGEDIR = "merges";
|
|
static String MERGEDIR = "merges";
|
|
static String TMPREGION_PREFIX = "tmpregion_";
|
|
static String TMPREGION_PREFIX = "tmpregion_";
|
|
- static int MIN_COMMITS_FOR_COMPACTION = 10;
|
|
|
|
static Random rand = new Random();
|
|
static Random rand = new Random();
|
|
-
|
|
|
|
static final Log LOG = LogFactory.getLog(HRegion.class);
|
|
static final Log LOG = LogFactory.getLog(HRegion.class);
|
|
|
|
+ final AtomicBoolean closed = new AtomicBoolean(false);
|
|
|
|
+ private long noFlushCount = 0;
|
|
|
|
|
|
/**
|
|
/**
|
|
* Deletes all the files for a HRegion
|
|
* Deletes all the files for a HRegion
|
|
@@ -90,7 +91,7 @@ public class HRegion implements HConstants {
|
|
*/
|
|
*/
|
|
static void deleteRegion(FileSystem fs, Path baseDirectory,
|
|
static void deleteRegion(FileSystem fs, Path baseDirectory,
|
|
Text regionName) throws IOException {
|
|
Text regionName) throws IOException {
|
|
- LOG.debug("Deleting region " + regionName);
|
|
|
|
|
|
+ LOG.info("Deleting region " + regionName);
|
|
fs.delete(HStoreFile.getHRegionDir(baseDirectory, regionName));
|
|
fs.delete(HStoreFile.getHRegionDir(baseDirectory, regionName));
|
|
}
|
|
}
|
|
|
|
|
|
@@ -263,7 +264,7 @@ public class HRegion implements HConstants {
|
|
Map<Long, TreeMap<Text, byte []>> targetColumns
|
|
Map<Long, TreeMap<Text, byte []>> targetColumns
|
|
= new HashMap<Long, TreeMap<Text, byte []>>();
|
|
= new HashMap<Long, TreeMap<Text, byte []>>();
|
|
|
|
|
|
- HMemcache memcache;
|
|
|
|
|
|
+ final HMemcache memcache;
|
|
|
|
|
|
Path rootDir;
|
|
Path rootDir;
|
|
HLog log;
|
|
HLog log;
|
|
@@ -275,19 +276,16 @@ public class HRegion implements HConstants {
|
|
static class WriteState {
|
|
static class WriteState {
|
|
volatile boolean writesOngoing;
|
|
volatile boolean writesOngoing;
|
|
volatile boolean writesEnabled;
|
|
volatile boolean writesEnabled;
|
|
- volatile boolean closed;
|
|
|
|
WriteState() {
|
|
WriteState() {
|
|
this.writesOngoing = true;
|
|
this.writesOngoing = true;
|
|
this.writesEnabled = true;
|
|
this.writesEnabled = true;
|
|
- this.closed = false;
|
|
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
|
|
volatile WriteState writestate = new WriteState();
|
|
volatile WriteState writestate = new WriteState();
|
|
- int recentCommits = 0;
|
|
|
|
- volatile int commitsSinceFlush = 0;
|
|
|
|
|
|
|
|
- int maxUnflushedEntries = 0;
|
|
|
|
|
|
+ final int memcacheFlushSize;
|
|
|
|
+ final int blockingMemcacheSize;
|
|
int compactionThreshold = 0;
|
|
int compactionThreshold = 0;
|
|
private final HLocking lock = new HLocking();
|
|
private final HLocking lock = new HLocking();
|
|
private long desiredMaxFileSize;
|
|
private long desiredMaxFileSize;
|
|
@@ -330,7 +328,6 @@ public class HRegion implements HConstants {
|
|
|
|
|
|
this.writestate.writesOngoing = true;
|
|
this.writestate.writesOngoing = true;
|
|
this.writestate.writesEnabled = true;
|
|
this.writestate.writesEnabled = true;
|
|
- this.writestate.closed = false;
|
|
|
|
|
|
|
|
// Declare the regionName. This is a unique string for the region, used to
|
|
// Declare the regionName. This is a unique string for the region, used to
|
|
// build a unique filename.
|
|
// build a unique filename.
|
|
@@ -349,7 +346,7 @@ public class HRegion implements HConstants {
|
|
this.regionInfo.tableDesc.families().entrySet()) {
|
|
this.regionInfo.tableDesc.families().entrySet()) {
|
|
Text colFamily = HStoreKey.extractFamily(e.getKey());
|
|
Text colFamily = HStoreKey.extractFamily(e.getKey());
|
|
stores.put(colFamily, new HStore(rootDir, this.regionInfo.regionName,
|
|
stores.put(colFamily, new HStore(rootDir, this.regionInfo.regionName,
|
|
- e.getValue(), fs, oldLogFile, conf));
|
|
|
|
|
|
+ e.getValue(), fs, oldLogFile, conf));
|
|
}
|
|
}
|
|
|
|
|
|
// Get rid of any splits or merges that were lost in-progress
|
|
// Get rid of any splits or merges that were lost in-progress
|
|
@@ -362,14 +359,16 @@ public class HRegion implements HConstants {
|
|
fs.delete(merges);
|
|
fs.delete(merges);
|
|
}
|
|
}
|
|
|
|
|
|
- // By default, we flush the cache after 10,000 commits
|
|
|
|
-
|
|
|
|
- this.maxUnflushedEntries = conf.getInt("hbase.hregion.maxunflushed", 10000);
|
|
|
|
|
|
+ // By default, we flush the cache when 32M.
|
|
|
|
+ this.memcacheFlushSize = conf.getInt("hbase.hregion.memcache.flush.size",
|
|
|
|
+ 1024*1024*16);
|
|
|
|
+ this.blockingMemcacheSize = this.memcacheFlushSize *
|
|
|
|
+ conf.getInt("hbase.hregion.memcache.block.multiplier", 2);
|
|
|
|
|
|
- // By default, we compact the region if an HStore has more than 10 map files
|
|
|
|
-
|
|
|
|
- this.compactionThreshold =
|
|
|
|
- conf.getInt("hbase.hregion.compactionThreshold", 10);
|
|
|
|
|
|
+ // By default, we compact the region if an HStore has more than
|
|
|
|
+ // MIN_COMMITS_FOR_COMPACTION map files
|
|
|
|
+ this.compactionThreshold = conf.getInt("hbase.hregion.compactionThreshold",
|
|
|
|
+ 3);
|
|
|
|
|
|
// By default we split region if a file > DEFAULT_MAX_FILE_SIZE.
|
|
// By default we split region if a file > DEFAULT_MAX_FILE_SIZE.
|
|
this.desiredMaxFileSize =
|
|
this.desiredMaxFileSize =
|
|
@@ -387,22 +386,20 @@ public class HRegion implements HConstants {
|
|
|
|
|
|
/** returns true if region is closed */
|
|
/** returns true if region is closed */
|
|
boolean isClosed() {
|
|
boolean isClosed() {
|
|
- boolean closed = false;
|
|
|
|
- synchronized(writestate) {
|
|
|
|
- closed = writestate.closed;
|
|
|
|
- }
|
|
|
|
- return closed;
|
|
|
|
|
|
+ return this.closed.get();
|
|
}
|
|
}
|
|
|
|
|
|
/**
|
|
/**
|
|
* Close down this HRegion. Flush the cache, shut down each HStore, don't
|
|
* Close down this HRegion. Flush the cache, shut down each HStore, don't
|
|
* service any more calls.
|
|
* service any more calls.
|
|
*
|
|
*
|
|
- * This method could take some time to execute, so don't call it from a
|
|
|
|
|
|
+ * <p>This method could take some time to execute, so don't call it from a
|
|
* time-sensitive thread.
|
|
* time-sensitive thread.
|
|
*
|
|
*
|
|
* @return Vector of all the storage files that the HRegion's component
|
|
* @return Vector of all the storage files that the HRegion's component
|
|
- * HStores make use of. It's a list of HStoreFile objects.
|
|
|
|
|
|
+ * HStores make use of. It's a list of HStoreFile objects. Returns empty
|
|
|
|
+ * vector if already closed and null if it is judged that it should not
|
|
|
|
+ * close.
|
|
*
|
|
*
|
|
* @throws IOException
|
|
* @throws IOException
|
|
*/
|
|
*/
|
|
@@ -424,14 +421,14 @@ public class HRegion implements HConstants {
|
|
* @throws IOException
|
|
* @throws IOException
|
|
*/
|
|
*/
|
|
Vector<HStoreFile> close(boolean abort) throws IOException {
|
|
Vector<HStoreFile> close(boolean abort) throws IOException {
|
|
|
|
+ if (isClosed()) {
|
|
|
|
+ LOG.info("region " + this.regionInfo.regionName + " already closed");
|
|
|
|
+ return new Vector<HStoreFile>();
|
|
|
|
+ }
|
|
lock.obtainWriteLock();
|
|
lock.obtainWriteLock();
|
|
try {
|
|
try {
|
|
boolean shouldClose = false;
|
|
boolean shouldClose = false;
|
|
synchronized(writestate) {
|
|
synchronized(writestate) {
|
|
- if(writestate.closed) {
|
|
|
|
- LOG.info("region " + this.regionInfo.regionName + " closed");
|
|
|
|
- return new Vector<HStoreFile>();
|
|
|
|
- }
|
|
|
|
while(writestate.writesOngoing) {
|
|
while(writestate.writesOngoing) {
|
|
try {
|
|
try {
|
|
writestate.wait();
|
|
writestate.wait();
|
|
@@ -443,10 +440,16 @@ public class HRegion implements HConstants {
|
|
shouldClose = true;
|
|
shouldClose = true;
|
|
}
|
|
}
|
|
|
|
|
|
- if(! shouldClose) {
|
|
|
|
|
|
+ if(!shouldClose) {
|
|
return null;
|
|
return null;
|
|
}
|
|
}
|
|
LOG.info("closing region " + this.regionInfo.regionName);
|
|
LOG.info("closing region " + this.regionInfo.regionName);
|
|
|
|
+
|
|
|
|
+ // Write lock means no more row locks can be given out. Wait on
|
|
|
|
+ // outstanding row locks to come in before we close so we do not drop
|
|
|
|
+ // outstanding updates.
|
|
|
|
+ waitOnRowLocks();
|
|
|
|
+
|
|
Vector<HStoreFile> allHStoreFiles = null;
|
|
Vector<HStoreFile> allHStoreFiles = null;
|
|
if (!abort) {
|
|
if (!abort) {
|
|
// Don't flush the cache if we are aborting during a test.
|
|
// Don't flush the cache if we are aborting during a test.
|
|
@@ -459,9 +462,9 @@ public class HRegion implements HConstants {
|
|
return allHStoreFiles;
|
|
return allHStoreFiles;
|
|
} finally {
|
|
} finally {
|
|
synchronized (writestate) {
|
|
synchronized (writestate) {
|
|
- writestate.closed = true;
|
|
|
|
writestate.writesOngoing = false;
|
|
writestate.writesOngoing = false;
|
|
}
|
|
}
|
|
|
|
+ this.closed.set(true);
|
|
LOG.info("region " + this.regionInfo.regionName + " closed");
|
|
LOG.info("region " + this.regionInfo.regionName + " closed");
|
|
}
|
|
}
|
|
} finally {
|
|
} finally {
|
|
@@ -476,8 +479,7 @@ public class HRegion implements HConstants {
|
|
* Returns two brand-new (and open) HRegions
|
|
* Returns two brand-new (and open) HRegions
|
|
*/
|
|
*/
|
|
HRegion[] closeAndSplit(Text midKey, RegionUnavailableListener listener)
|
|
HRegion[] closeAndSplit(Text midKey, RegionUnavailableListener listener)
|
|
- throws IOException {
|
|
|
|
-
|
|
|
|
|
|
+ throws IOException {
|
|
if(((regionInfo.startKey.getLength() != 0)
|
|
if(((regionInfo.startKey.getLength() != 0)
|
|
&& (regionInfo.startKey.compareTo(midKey) > 0))
|
|
&& (regionInfo.startKey.compareTo(midKey) > 0))
|
|
|| ((regionInfo.endKey.getLength() != 0)
|
|
|| ((regionInfo.endKey.getLength() != 0)
|
|
@@ -486,8 +488,7 @@ public class HRegion implements HConstants {
|
|
"boundaries.");
|
|
"boundaries.");
|
|
}
|
|
}
|
|
|
|
|
|
- LOG.info("Splitting region " + this.regionInfo.regionName);
|
|
|
|
-
|
|
|
|
|
|
+ long startTime = System.currentTimeMillis();
|
|
Path splits = new Path(regiondir, SPLITDIR);
|
|
Path splits = new Path(regiondir, SPLITDIR);
|
|
if(! fs.exists(splits)) {
|
|
if(! fs.exists(splits)) {
|
|
fs.mkdirs(splits);
|
|
fs.mkdirs(splits);
|
|
@@ -495,47 +496,44 @@ public class HRegion implements HConstants {
|
|
|
|
|
|
long regionAId = Math.abs(rand.nextLong());
|
|
long regionAId = Math.abs(rand.nextLong());
|
|
HRegionInfo regionAInfo = new HRegionInfo(regionAId, regionInfo.tableDesc,
|
|
HRegionInfo regionAInfo = new HRegionInfo(regionAId, regionInfo.tableDesc,
|
|
- regionInfo.startKey, midKey);
|
|
|
|
-
|
|
|
|
|
|
+ regionInfo.startKey, midKey);
|
|
long regionBId = Math.abs(rand.nextLong());
|
|
long regionBId = Math.abs(rand.nextLong());
|
|
- HRegionInfo regionBInfo
|
|
|
|
- = new HRegionInfo(regionBId, regionInfo.tableDesc, midKey, null);
|
|
|
|
-
|
|
|
|
|
|
+ HRegionInfo regionBInfo =
|
|
|
|
+ new HRegionInfo(regionBId, regionInfo.tableDesc, midKey, null);
|
|
Path dirA = HStoreFile.getHRegionDir(splits, regionAInfo.regionName);
|
|
Path dirA = HStoreFile.getHRegionDir(splits, regionAInfo.regionName);
|
|
Path dirB = HStoreFile.getHRegionDir(splits, regionBInfo.regionName);
|
|
Path dirB = HStoreFile.getHRegionDir(splits, regionBInfo.regionName);
|
|
-
|
|
|
|
if(fs.exists(dirA) || fs.exists(dirB)) {
|
|
if(fs.exists(dirA) || fs.exists(dirB)) {
|
|
- throw new IOException("Cannot split; target file collision at " + dirA
|
|
|
|
- + " or " + dirB);
|
|
|
|
|
|
+ throw new IOException("Cannot split; target file collision at " + dirA +
|
|
|
|
+ " or " + dirB);
|
|
}
|
|
}
|
|
|
|
+
|
|
|
|
+ // We just copied most of the data. Now get whatever updates are up in
|
|
|
|
+ // the memcache (after shutting down new updates).
|
|
|
|
|
|
- TreeSet<HStoreFile> alreadySplit = new TreeSet<HStoreFile>();
|
|
|
|
|
|
+ // Notify the caller that we are about to close the region. This moves
|
|
|
|
+ // us ot the 'retiring' queue. Means no more updates coming in -- just
|
|
|
|
+ // whatever is outstanding.
|
|
|
|
+ listener.closing(this.getRegionName());
|
|
|
|
+
|
|
|
|
+ // Wait on the last row updates to come in.
|
|
|
|
+ LOG.debug("Starting wait on row locks.");
|
|
|
|
+ waitOnRowLocks();
|
|
|
|
|
|
// Flush this HRegion out to storage, and turn off flushes
|
|
// Flush this HRegion out to storage, and turn off flushes
|
|
// or compactions until close() is called.
|
|
// or compactions until close() is called.
|
|
-
|
|
|
|
- // TODO: flushcache can come back null if it can't do the flush. FIX.
|
|
|
|
|
|
+ LOG.debug("Calling flushcache inside closeAndSplit");
|
|
Vector<HStoreFile> hstoreFilesToSplit = flushcache(true);
|
|
Vector<HStoreFile> hstoreFilesToSplit = flushcache(true);
|
|
|
|
+ if (hstoreFilesToSplit == null) {
|
|
|
|
+ // It should always return a list of hstore files even if memcache is
|
|
|
|
+ // empty. It will return null if concurrent compaction or splits which
|
|
|
|
+ // should not happen.
|
|
|
|
+ throw new NullPointerException("Flushcache did not return any files");
|
|
|
|
+ }
|
|
|
|
+ TreeSet<HStoreFile> alreadySplit = new TreeSet<HStoreFile>();
|
|
for(HStoreFile hsf: hstoreFilesToSplit) {
|
|
for(HStoreFile hsf: hstoreFilesToSplit) {
|
|
- if (LOG.isDebugEnabled()) {
|
|
|
|
- LOG.debug("Splitting HStore " + hsf.getRegionName() + "/" +
|
|
|
|
- hsf.getColFamily() + "/" + hsf.fileId());
|
|
|
|
- }
|
|
|
|
- HStoreFile dstA = new HStoreFile(conf, splits, regionAInfo.regionName,
|
|
|
|
- hsf.getColFamily(), Math.abs(rand.nextLong()));
|
|
|
|
- HStoreFile dstB = new HStoreFile(conf, splits, regionBInfo.regionName,
|
|
|
|
- hsf.getColFamily(), Math.abs(rand.nextLong()));
|
|
|
|
- hsf.splitStoreFile(midKey, dstA, dstB, fs, conf);
|
|
|
|
- alreadySplit.add(hsf);
|
|
|
|
|
|
+ alreadySplit.add(splitStoreFile(hsf, splits, regionAInfo,
|
|
|
|
+ regionBInfo, midKey));
|
|
}
|
|
}
|
|
-
|
|
|
|
- // We just copied most of the data.
|
|
|
|
-
|
|
|
|
- // Notify the caller that we are about to close the region
|
|
|
|
- listener.closing(this.getRegionName());
|
|
|
|
-
|
|
|
|
- // Wait on the last row updates to come in.
|
|
|
|
- waitOnRowLocks();
|
|
|
|
|
|
|
|
// Now close the HRegion
|
|
// Now close the HRegion
|
|
hstoreFilesToSplit = close();
|
|
hstoreFilesToSplit = close();
|
|
@@ -546,43 +544,46 @@ public class HRegion implements HConstants {
|
|
|
|
|
|
// Copy the small remainder
|
|
// Copy the small remainder
|
|
for(HStoreFile hsf: hstoreFilesToSplit) {
|
|
for(HStoreFile hsf: hstoreFilesToSplit) {
|
|
- if(! alreadySplit.contains(hsf)) {
|
|
|
|
- if(LOG.isDebugEnabled()) {
|
|
|
|
- LOG.debug("Splitting HStore " + hsf.getRegionName() + "/"
|
|
|
|
- + hsf.getColFamily() + "/" + hsf.fileId());
|
|
|
|
- }
|
|
|
|
-
|
|
|
|
- HStoreFile dstA = new HStoreFile(conf, splits, regionAInfo.regionName,
|
|
|
|
- hsf.getColFamily(), Math.abs(rand.nextLong()));
|
|
|
|
-
|
|
|
|
- HStoreFile dstB = new HStoreFile(conf, splits, regionBInfo.regionName,
|
|
|
|
- hsf.getColFamily(), Math.abs(rand.nextLong()));
|
|
|
|
-
|
|
|
|
- hsf.splitStoreFile(midKey, dstA, dstB, fs, conf);
|
|
|
|
|
|
+ if(!alreadySplit.contains(hsf)) {
|
|
|
|
+ splitStoreFile(hsf, splits, regionAInfo, regionBInfo, midKey);
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
|
|
// Done
|
|
// Done
|
|
-
|
|
|
|
HRegion regionA = new HRegion(rootDir, log, fs, conf, regionAInfo, dirA);
|
|
HRegion regionA = new HRegion(rootDir, log, fs, conf, regionAInfo, dirA);
|
|
-
|
|
|
|
HRegion regionB = new HRegion(rootDir, log, fs, conf, regionBInfo, dirB);
|
|
HRegion regionB = new HRegion(rootDir, log, fs, conf, regionBInfo, dirB);
|
|
|
|
|
|
// Cleanup
|
|
// Cleanup
|
|
-
|
|
|
|
- fs.delete(splits); // Get rid of splits directory
|
|
|
|
- fs.delete(regiondir); // and the directory for the old region
|
|
|
|
-
|
|
|
|
|
|
+ fs.delete(splits); // Get rid of splits directory
|
|
|
|
+ fs.delete(regiondir); // and the directory for the old region
|
|
HRegion regions[] = new HRegion[2];
|
|
HRegion regions[] = new HRegion[2];
|
|
regions[0] = regionA;
|
|
regions[0] = regionA;
|
|
regions[1] = regionB;
|
|
regions[1] = regionB;
|
|
-
|
|
|
|
LOG.info("Region split of " + this.regionInfo.regionName + " complete. " +
|
|
LOG.info("Region split of " + this.regionInfo.regionName + " complete. " +
|
|
"New regions are: " + regions[0].getRegionName() + ", " +
|
|
"New regions are: " + regions[0].getRegionName() + ", " +
|
|
- regions[1].getRegionName());
|
|
|
|
-
|
|
|
|
|
|
+ regions[1].getRegionName() + ". Took " +
|
|
|
|
+ StringUtils.formatTimeDiff(System.currentTimeMillis(), startTime));
|
|
return regions;
|
|
return regions;
|
|
}
|
|
}
|
|
|
|
+
|
|
|
|
+ private HStoreFile splitStoreFile(final HStoreFile hsf, final Path splits,
|
|
|
|
+ final HRegionInfo a, final HRegionInfo b, final Text midKey)
|
|
|
|
+ throws IOException {
|
|
|
|
+ if (LOG.isDebugEnabled()) {
|
|
|
|
+ LOG.debug("Started splitting HStore " + hsf.getRegionName() + "/" +
|
|
|
|
+ hsf.getColFamily() + "/" + hsf.fileId());
|
|
|
|
+ }
|
|
|
|
+ HStoreFile dstA = new HStoreFile(conf, splits, a.regionName,
|
|
|
|
+ hsf.getColFamily(), Math.abs(rand.nextLong()));
|
|
|
|
+ HStoreFile dstB = new HStoreFile(conf, splits, b.regionName,
|
|
|
|
+ hsf.getColFamily(), Math.abs(rand.nextLong()));
|
|
|
|
+ hsf.splitStoreFile(midKey, dstA, dstB, fs, conf);
|
|
|
|
+ if (LOG.isDebugEnabled()) {
|
|
|
|
+ LOG.debug("Finished splitting HStore " + hsf.getRegionName() + "/" +
|
|
|
|
+ hsf.getColFamily() + "/" + hsf.fileId());
|
|
|
|
+ }
|
|
|
|
+ return hsf;
|
|
|
|
+ }
|
|
|
|
|
|
//////////////////////////////////////////////////////////////////////////////
|
|
//////////////////////////////////////////////////////////////////////////////
|
|
// HRegion accessors
|
|
// HRegion accessors
|
|
@@ -646,10 +647,10 @@ public class HRegion implements HConstants {
|
|
//////////////////////////////////////////////////////////////////////////////
|
|
//////////////////////////////////////////////////////////////////////////////
|
|
|
|
|
|
/**
|
|
/**
|
|
- * Iterates through all the HStores and finds the one with the largest MapFile
|
|
|
|
- * size. If the size is greater than the (currently hard-coded) threshold,
|
|
|
|
- * returns true indicating that the region should be split. The midKey for the
|
|
|
|
- * largest MapFile is returned through the midKey parameter.
|
|
|
|
|
|
+ * Iterates through all the HStores and finds the one with the largest
|
|
|
|
+ * MapFile size. If the size is greater than the (currently hard-coded)
|
|
|
|
+ * threshold, returns true indicating that the region should be split. The
|
|
|
|
+ * midKey for the largest MapFile is returned through the midKey parameter.
|
|
*
|
|
*
|
|
* @param midKey - (return value) midKey of the largest MapFile
|
|
* @param midKey - (return value) midKey of the largest MapFile
|
|
* @return - true if the region should be split
|
|
* @return - true if the region should be split
|
|
@@ -659,16 +660,27 @@ public class HRegion implements HConstants {
|
|
try {
|
|
try {
|
|
Text key = new Text();
|
|
Text key = new Text();
|
|
long maxSize = 0;
|
|
long maxSize = 0;
|
|
|
|
+ long aggregateSize = 0;
|
|
for(HStore store: stores.values()) {
|
|
for(HStore store: stores.values()) {
|
|
long size = store.getLargestFileSize(key);
|
|
long size = store.getLargestFileSize(key);
|
|
|
|
+ aggregateSize += size;
|
|
if(size > maxSize) { // Largest so far
|
|
if(size > maxSize) { // Largest so far
|
|
maxSize = size;
|
|
maxSize = size;
|
|
midKey.set(key);
|
|
midKey.set(key);
|
|
}
|
|
}
|
|
}
|
|
}
|
|
-
|
|
|
|
- return (maxSize >
|
|
|
|
- (this.desiredMaxFileSize + (this.desiredMaxFileSize / 2)));
|
|
|
|
|
|
+ long triggerSize =
|
|
|
|
+ this.desiredMaxFileSize + (this.desiredMaxFileSize / 2);
|
|
|
|
+ boolean split = (maxSize >= triggerSize || aggregateSize >= triggerSize);
|
|
|
|
+ if (split) {
|
|
|
|
+ LOG.info("Splitting " + getRegionName().toString() +
|
|
|
|
+ " because largest file is " + StringUtils.humanReadableInt(maxSize) +
|
|
|
|
+ ", aggregate size is " +
|
|
|
|
+ StringUtils.humanReadableInt(aggregateSize) +
|
|
|
|
+ " and desired size is " +
|
|
|
|
+ StringUtils.humanReadableInt(this.desiredMaxFileSize));
|
|
|
|
+ }
|
|
|
|
+ return split;
|
|
} finally {
|
|
} finally {
|
|
lock.releaseReadLock();
|
|
lock.releaseReadLock();
|
|
}
|
|
}
|
|
@@ -706,6 +718,9 @@ public class HRegion implements HConstants {
|
|
for(HStore store: stores.values()) {
|
|
for(HStore store: stores.values()) {
|
|
if(store.getNMaps() > this.compactionThreshold) {
|
|
if(store.getNMaps() > this.compactionThreshold) {
|
|
needsCompaction = true;
|
|
needsCompaction = true;
|
|
|
|
+ LOG.info(getRegionName().toString() + " needs compaction because " +
|
|
|
|
+ store.getNMaps() + " store files present and threshold is " +
|
|
|
|
+ this.compactionThreshold);
|
|
break;
|
|
break;
|
|
}
|
|
}
|
|
}
|
|
}
|
|
@@ -719,46 +734,50 @@ public class HRegion implements HConstants {
|
|
* Compact all the stores. This should be called periodically to make sure
|
|
* Compact all the stores. This should be called periodically to make sure
|
|
* the stores are kept manageable.
|
|
* the stores are kept manageable.
|
|
*
|
|
*
|
|
- * This operation could block for a long time, so don't call it from a
|
|
|
|
|
|
+ * <p>This operation could block for a long time, so don't call it from a
|
|
* time-sensitive thread.
|
|
* time-sensitive thread.
|
|
*
|
|
*
|
|
- * If it returns TRUE, the compaction has completed.
|
|
|
|
- *
|
|
|
|
- * If it returns FALSE, the compaction was not carried out, because the
|
|
|
|
- * HRegion is busy doing something else storage-intensive (like flushing the
|
|
|
|
- * cache). The caller should check back later.
|
|
|
|
|
|
+ * @return Returns TRUE if the compaction has completed. FALSE, if the
|
|
|
|
+ * compaction was not carried out, because the HRegion is busy doing
|
|
|
|
+ * something else storage-intensive (like flushing the cache). The caller
|
|
|
|
+ * should check back later.
|
|
*/
|
|
*/
|
|
boolean compactStores() throws IOException {
|
|
boolean compactStores() throws IOException {
|
|
boolean shouldCompact = false;
|
|
boolean shouldCompact = false;
|
|
|
|
+ if (this.closed.get()) {
|
|
|
|
+ return shouldCompact;
|
|
|
|
+ }
|
|
lock.obtainReadLock();
|
|
lock.obtainReadLock();
|
|
try {
|
|
try {
|
|
synchronized (writestate) {
|
|
synchronized (writestate) {
|
|
if ((!writestate.writesOngoing) &&
|
|
if ((!writestate.writesOngoing) &&
|
|
- writestate.writesEnabled &&
|
|
|
|
- (!writestate.closed) &&
|
|
|
|
- recentCommits > MIN_COMMITS_FOR_COMPACTION) {
|
|
|
|
|
|
+ writestate.writesEnabled) {
|
|
writestate.writesOngoing = true;
|
|
writestate.writesOngoing = true;
|
|
shouldCompact = true;
|
|
shouldCompact = true;
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
|
|
if (!shouldCompact) {
|
|
if (!shouldCompact) {
|
|
- LOG.info("not compacting region " + this.regionInfo);
|
|
|
|
|
|
+ LOG.info("NOT compacting region " +
|
|
|
|
+ this.regionInfo.getRegionName().toString());
|
|
return false;
|
|
return false;
|
|
}
|
|
}
|
|
|
|
|
|
- LOG.info("starting compaction on region " + this.regionInfo);
|
|
|
|
|
|
+ long startTime = System.currentTimeMillis();
|
|
|
|
+ LOG.info("starting compaction on region " +
|
|
|
|
+ this.regionInfo.getRegionName().toString());
|
|
for (HStore store : stores.values()) {
|
|
for (HStore store : stores.values()) {
|
|
store.compact();
|
|
store.compact();
|
|
}
|
|
}
|
|
- LOG.info("compaction completed on region " + this.regionInfo);
|
|
|
|
|
|
+ LOG.info("compaction completed on region " +
|
|
|
|
+ this.regionInfo.getRegionName().toString() + ". Took " +
|
|
|
|
+ StringUtils.formatTimeDiff(System.currentTimeMillis(), startTime));
|
|
return true;
|
|
return true;
|
|
|
|
|
|
} finally {
|
|
} finally {
|
|
lock.releaseReadLock();
|
|
lock.releaseReadLock();
|
|
synchronized (writestate) {
|
|
synchronized (writestate) {
|
|
writestate.writesOngoing = false;
|
|
writestate.writesOngoing = false;
|
|
- recentCommits = 0;
|
|
|
|
writestate.notifyAll();
|
|
writestate.notifyAll();
|
|
}
|
|
}
|
|
}
|
|
}
|
|
@@ -769,50 +788,64 @@ public class HRegion implements HConstants {
|
|
* only take if there have been a lot of uncommitted writes.
|
|
* only take if there have been a lot of uncommitted writes.
|
|
*/
|
|
*/
|
|
void optionallyFlush() throws IOException {
|
|
void optionallyFlush() throws IOException {
|
|
- if(commitsSinceFlush > maxUnflushedEntries) {
|
|
|
|
- if (LOG.isDebugEnabled()) {
|
|
|
|
- LOG.debug("Flushing cache. Number of commits is: " + commitsSinceFlush);
|
|
|
|
- }
|
|
|
|
|
|
+ if(this.memcache.getSize() > this.memcacheFlushSize) {
|
|
|
|
+ flushcache(false);
|
|
|
|
+ } else if (this.memcache.getSize() > 0 && this.noFlushCount >= 10) {
|
|
|
|
+ LOG.info("Optional flush called " + this.noFlushCount +
|
|
|
|
+ " times when data present without flushing. Forcing one.");
|
|
flushcache(false);
|
|
flushcache(false);
|
|
|
|
+ if (this.memcache.getSize() > 0) {
|
|
|
|
+ // Only increment if something in the cache.
|
|
|
|
+ // Gets zero'd when a flushcache is called.
|
|
|
|
+ this.noFlushCount++;
|
|
|
|
+ }
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
|
|
/**
|
|
/**
|
|
- * Flush the cache. This is called periodically to minimize the amount of log
|
|
|
|
- * processing needed upon startup.
|
|
|
|
|
|
+ * Flush the cache. This is called periodically to minimize the amount of
|
|
|
|
+ * log processing needed upon startup.
|
|
*
|
|
*
|
|
- * The returned Vector is a list of all the files used by the component HStores.
|
|
|
|
- * It is a list of HStoreFile objects. If the returned value is NULL, then the
|
|
|
|
- * flush could not be executed, because the HRegion is busy doing something
|
|
|
|
- * else storage-intensive. The caller should check back later.
|
|
|
|
|
|
+ * <p>The returned Vector is a list of all the files used by the component
|
|
|
|
+ * HStores. It is a list of HStoreFile objects. If the returned value is
|
|
|
|
+ * NULL, then the flush could not be executed, because the HRegion is busy
|
|
|
|
+ * doing something else storage-intensive. The caller should check back
|
|
|
|
+ * later.
|
|
*
|
|
*
|
|
- * The 'disableFutureWrites' boolean indicates that the caller intends to
|
|
|
|
|
|
+ * <p>This method may block for some time, so it should not be called from a
|
|
|
|
+ * time-sensitive thread.
|
|
|
|
+ *
|
|
|
|
+ * @param disableFutureWrites indicates that the caller intends to
|
|
* close() the HRegion shortly, so the HRegion should not take on any new and
|
|
* close() the HRegion shortly, so the HRegion should not take on any new and
|
|
- * potentially long-lasting disk operations. This flush() should be the final
|
|
|
|
|
|
+ * potentially long-lasting disk operations. This flush() should be the final
|
|
* pre-close() disk operation.
|
|
* pre-close() disk operation.
|
|
- *
|
|
|
|
- * This method may block for some time, so it should not be called from a
|
|
|
|
- * time-sensitive thread.
|
|
|
|
|
|
+ *
|
|
|
|
+ * @return List of store files including new flushes, if any. If no flushes
|
|
|
|
+ * because memcache is null, returns all current store files. Returns
|
|
|
|
+ * null if no flush (Writes are going on elsewhere -- concurrently we are
|
|
|
|
+ * compacting or splitting).
|
|
*/
|
|
*/
|
|
- Vector<HStoreFile> flushcache(boolean disableFutureWrites) throws IOException {
|
|
|
|
|
|
+ Vector<HStoreFile> flushcache(boolean disableFutureWrites)
|
|
|
|
+ throws IOException {
|
|
|
|
+ if (this.closed.get()) {
|
|
|
|
+ return null;
|
|
|
|
+ }
|
|
|
|
+ this.noFlushCount = 0;
|
|
boolean shouldFlush = false;
|
|
boolean shouldFlush = false;
|
|
synchronized(writestate) {
|
|
synchronized(writestate) {
|
|
- if((! writestate.writesOngoing)
|
|
|
|
- && writestate.writesEnabled
|
|
|
|
- && (! writestate.closed)) {
|
|
|
|
-
|
|
|
|
|
|
+ if((!writestate.writesOngoing) &&
|
|
|
|
+ writestate.writesEnabled) {
|
|
writestate.writesOngoing = true;
|
|
writestate.writesOngoing = true;
|
|
shouldFlush = true;
|
|
shouldFlush = true;
|
|
-
|
|
|
|
if(disableFutureWrites) {
|
|
if(disableFutureWrites) {
|
|
writestate.writesEnabled = false;
|
|
writestate.writesEnabled = false;
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}
|
|
-
|
|
|
|
- if(! shouldFlush) {
|
|
|
|
|
|
+
|
|
|
|
+ if(!shouldFlush) {
|
|
if(LOG.isDebugEnabled()) {
|
|
if(LOG.isDebugEnabled()) {
|
|
- LOG.debug("not flushing cache for region " +
|
|
|
|
|
|
+ LOG.debug("NOT flushing memcache for region " +
|
|
this.regionInfo.regionName);
|
|
this.regionInfo.regionName);
|
|
}
|
|
}
|
|
return null;
|
|
return null;
|
|
@@ -820,7 +853,6 @@ public class HRegion implements HConstants {
|
|
|
|
|
|
try {
|
|
try {
|
|
return internalFlushcache();
|
|
return internalFlushcache();
|
|
-
|
|
|
|
} finally {
|
|
} finally {
|
|
synchronized (writestate) {
|
|
synchronized (writestate) {
|
|
writestate.writesOngoing = false;
|
|
writestate.writesOngoing = false;
|
|
@@ -831,73 +863,69 @@ public class HRegion implements HConstants {
|
|
|
|
|
|
/**
|
|
/**
|
|
* Flushing the cache is a little tricky. We have a lot of updates in the
|
|
* Flushing the cache is a little tricky. We have a lot of updates in the
|
|
- * HMemcache, all of which have also been written to the log. We need to write
|
|
|
|
- * those updates in the HMemcache out to disk, while being able to process
|
|
|
|
- * reads/writes as much as possible during the flush operation. Also, the log
|
|
|
|
- * has to state clearly the point in time at which the HMemcache was flushed.
|
|
|
|
- * (That way, during recovery, we know when we can rely on the on-disk flushed
|
|
|
|
- * structures and when we have to recover the HMemcache from the log.)
|
|
|
|
|
|
+ * HMemcache, all of which have also been written to the log. We need to
|
|
|
|
+ * write those updates in the HMemcache out to disk, while being able to
|
|
|
|
+ * process reads/writes as much as possible during the flush operation. Also,
|
|
|
|
+ * the log has to state clearly the point in time at which the HMemcache was
|
|
|
|
+ * flushed. (That way, during recovery, we know when we can rely on the
|
|
|
|
+ * on-disk flushed structures and when we have to recover the HMemcache from
|
|
|
|
+ * the log.)
|
|
*
|
|
*
|
|
- * So, we have a three-step process:
|
|
|
|
|
|
+ * <p>So, we have a three-step process:
|
|
*
|
|
*
|
|
- * A. Flush the memcache to the on-disk stores, noting the current sequence ID
|
|
|
|
- * for the log.
|
|
|
|
|
|
+ * <ul><li>A. Flush the memcache to the on-disk stores, noting the current
|
|
|
|
+ * sequence ID for the log.<li>
|
|
*
|
|
*
|
|
- * B. Write a FLUSHCACHE-COMPLETE message to the log, using the sequence ID
|
|
|
|
- * that was current at the time of memcache-flush.
|
|
|
|
|
|
+ * <li>B. Write a FLUSHCACHE-COMPLETE message to the log, using the sequence
|
|
|
|
+ * ID that was current at the time of memcache-flush.</li>
|
|
*
|
|
*
|
|
- * C. Get rid of the memcache structures that are now redundant, as they've
|
|
|
|
- * been flushed to the on-disk HStores.
|
|
|
|
|
|
+ * <li>C. Get rid of the memcache structures that are now redundant, as
|
|
|
|
+ * they've been flushed to the on-disk HStores.</li>
|
|
|
|
+ * </ul>
|
|
|
|
+ * <p>This method is protected, but can be accessed via several public
|
|
|
|
+ * routes.
|
|
*
|
|
*
|
|
- * This method is protected, but can be accessed via several public routes.
|
|
|
|
|
|
+ * <p> This method may block for some time.
|
|
*
|
|
*
|
|
- * This method may block for some time.
|
|
|
|
|
|
+ * @return List of store files including just-made new flushes per-store. If
|
|
|
|
+ * not flush, returns list of all store files.
|
|
*/
|
|
*/
|
|
Vector<HStoreFile> internalFlushcache() throws IOException {
|
|
Vector<HStoreFile> internalFlushcache() throws IOException {
|
|
- Vector<HStoreFile> allHStoreFiles = new Vector<HStoreFile>();
|
|
|
|
-
|
|
|
|
|
|
+
|
|
|
|
+ long startTime = -1;
|
|
if(LOG.isDebugEnabled()) {
|
|
if(LOG.isDebugEnabled()) {
|
|
- LOG.debug("flushing cache for region " + this.regionInfo.regionName);
|
|
|
|
|
|
+ startTime = System.currentTimeMillis();
|
|
|
|
+ LOG.debug("Started memcache flush for region " +
|
|
|
|
+ this.regionInfo.regionName + ". Size " +
|
|
|
|
+ StringUtils.humanReadableInt(this.memcache.getSize()));
|
|
}
|
|
}
|
|
|
|
|
|
- // We pass the log to the HMemcache, so we can lock down
|
|
|
|
- // both simultaneously. We only have to do this for a moment:
|
|
|
|
- // we need the HMemcache state at the time of a known log sequence
|
|
|
|
- // number. Since multiple HRegions may write to a single HLog,
|
|
|
|
- // the sequence numbers may zoom past unless we lock it.
|
|
|
|
|
|
+ // We pass the log to the HMemcache, so we can lock down both
|
|
|
|
+ // simultaneously. We only have to do this for a moment: we need the
|
|
|
|
+ // HMemcache state at the time of a known log sequence number. Since
|
|
|
|
+ // multiple HRegions may write to a single HLog, the sequence numbers may
|
|
|
|
+ // zoom past unless we lock it.
|
|
//
|
|
//
|
|
- // When execution returns from snapshotMemcacheForLog()
|
|
|
|
- // with a non-NULL value, the HMemcache will have a snapshot
|
|
|
|
- // object stored that must be explicitly cleaned up using
|
|
|
|
- // a call to deleteSnapshot().
|
|
|
|
-
|
|
|
|
- if(LOG.isDebugEnabled()) {
|
|
|
|
- LOG.debug("starting memcache snapshot");
|
|
|
|
- }
|
|
|
|
-
|
|
|
|
|
|
+ // When execution returns from snapshotMemcacheForLog() with a non-NULL
|
|
|
|
+ // value, the HMemcache will have a snapshot object stored that must be
|
|
|
|
+ // explicitly cleaned up using a call to deleteSnapshot().
|
|
HMemcache.Snapshot retval = memcache.snapshotMemcacheForLog(log);
|
|
HMemcache.Snapshot retval = memcache.snapshotMemcacheForLog(log);
|
|
- TreeMap<HStoreKey, byte []> memcacheSnapshot = retval.memcacheSnapshot;
|
|
|
|
- if(memcacheSnapshot == null) {
|
|
|
|
- for(HStore hstore: stores.values()) {
|
|
|
|
- Vector<HStoreFile> hstoreFiles = hstore.getAllMapFiles();
|
|
|
|
- allHStoreFiles.addAll(0, hstoreFiles);
|
|
|
|
- }
|
|
|
|
- return allHStoreFiles;
|
|
|
|
|
|
+ if(retval == null || retval.memcacheSnapshot == null) {
|
|
|
|
+ return getAllStoreFiles();
|
|
}
|
|
}
|
|
-
|
|
|
|
long logCacheFlushId = retval.sequenceId;
|
|
long logCacheFlushId = retval.sequenceId;
|
|
-
|
|
|
|
- // A. Flush memcache to all the HStores.
|
|
|
|
-
|
|
|
|
if(LOG.isDebugEnabled()) {
|
|
if(LOG.isDebugEnabled()) {
|
|
- LOG.debug("flushing memcache to HStores");
|
|
|
|
|
|
+ LOG.debug("Snapshotted memcache for region " +
|
|
|
|
+ this.regionInfo.regionName + ". Sequence id " + retval.sequenceId);
|
|
}
|
|
}
|
|
-
|
|
|
|
- for(Iterator<HStore> it = stores.values().iterator(); it.hasNext(); ) {
|
|
|
|
- HStore hstore = it.next();
|
|
|
|
- Vector<HStoreFile> hstoreFiles
|
|
|
|
- = hstore.flushCache(memcacheSnapshot, logCacheFlushId);
|
|
|
|
-
|
|
|
|
|
|
+
|
|
|
|
+ // A. Flush memcache to all the HStores.
|
|
|
|
+ // Keep running vector of all store files that includes both old and the
|
|
|
|
+ // just-made new flush store file.
|
|
|
|
+ Vector<HStoreFile> allHStoreFiles = new Vector<HStoreFile>();
|
|
|
|
+ for(HStore hstore: stores.values()) {
|
|
|
|
+ Vector<HStoreFile> hstoreFiles
|
|
|
|
+ = hstore.flushCache(retval.memcacheSnapshot, retval.sequenceId);
|
|
allHStoreFiles.addAll(0, hstoreFiles);
|
|
allHStoreFiles.addAll(0, hstoreFiles);
|
|
}
|
|
}
|
|
|
|
|
|
@@ -906,27 +934,32 @@ public class HRegion implements HConstants {
|
|
// and that all updates to the log for this regionName that have lower
|
|
// and that all updates to the log for this regionName that have lower
|
|
// log-sequence-ids can be safely ignored.
|
|
// log-sequence-ids can be safely ignored.
|
|
|
|
|
|
- if(LOG.isDebugEnabled()) {
|
|
|
|
- LOG.debug("writing flush cache complete to log");
|
|
|
|
- }
|
|
|
|
-
|
|
|
|
log.completeCacheFlush(this.regionInfo.regionName,
|
|
log.completeCacheFlush(this.regionInfo.regionName,
|
|
- regionInfo.tableDesc.getName(), logCacheFlushId);
|
|
|
|
|
|
+ regionInfo.tableDesc.getName(), logCacheFlushId);
|
|
|
|
|
|
// C. Delete the now-irrelevant memcache snapshot; its contents have been
|
|
// C. Delete the now-irrelevant memcache snapshot; its contents have been
|
|
// dumped to disk-based HStores.
|
|
// dumped to disk-based HStores.
|
|
-
|
|
|
|
- if(LOG.isDebugEnabled()) {
|
|
|
|
- LOG.debug("deleting memcache snapshot");
|
|
|
|
- }
|
|
|
|
-
|
|
|
|
memcache.deleteSnapshot();
|
|
memcache.deleteSnapshot();
|
|
|
|
|
|
- if(LOG.isDebugEnabled()) {
|
|
|
|
- LOG.debug("cache flush complete for region " + this.regionInfo.regionName);
|
|
|
|
|
|
+ // D. Finally notify anyone waiting on memcache to clear:
|
|
|
|
+ // e.g. checkResources().
|
|
|
|
+ synchronized(this) {
|
|
|
|
+ notifyAll();
|
|
|
|
+ }
|
|
|
|
+ if (LOG.isDebugEnabled()) {
|
|
|
|
+ LOG.debug("Finished memcache flush for region " +
|
|
|
|
+ this.regionInfo.regionName + " in " +
|
|
|
|
+ (System.currentTimeMillis() - startTime) + "ms");
|
|
|
|
+ }
|
|
|
|
+ return allHStoreFiles;
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ private Vector<HStoreFile> getAllStoreFiles() {
|
|
|
|
+ Vector<HStoreFile> allHStoreFiles = new Vector<HStoreFile>();
|
|
|
|
+ for(HStore hstore: stores.values()) {
|
|
|
|
+ Vector<HStoreFile> hstoreFiles = hstore.getAllMapFiles();
|
|
|
|
+ allHStoreFiles.addAll(0, hstoreFiles);
|
|
}
|
|
}
|
|
-
|
|
|
|
- this.commitsSinceFlush = 0;
|
|
|
|
return allHStoreFiles;
|
|
return allHStoreFiles;
|
|
}
|
|
}
|
|
|
|
|
|
@@ -947,9 +980,10 @@ public class HRegion implements HConstants {
|
|
|
|
|
|
/** Fetch multiple versions of a single data item, with timestamp. */
|
|
/** Fetch multiple versions of a single data item, with timestamp. */
|
|
byte [][] get(Text row, Text column, long timestamp, int numVersions)
|
|
byte [][] get(Text row, Text column, long timestamp, int numVersions)
|
|
- throws IOException {
|
|
|
|
- if(writestate.closed) {
|
|
|
|
- throw new IOException("HRegion is closed.");
|
|
|
|
|
|
+ throws IOException {
|
|
|
|
+ if (this.closed.get()) {
|
|
|
|
+ throw new IOException("Region " + this.getRegionName().toString() +
|
|
|
|
+ " closed");
|
|
}
|
|
}
|
|
|
|
|
|
// Make sure this is a valid row and valid column
|
|
// Make sure this is a valid row and valid column
|
|
@@ -1065,17 +1099,75 @@ public class HRegion implements HConstants {
|
|
* after a specified quiet period.
|
|
* after a specified quiet period.
|
|
*
|
|
*
|
|
* @param row Row to update
|
|
* @param row Row to update
|
|
- * @return lockid
|
|
|
|
|
|
+ * @return lock id
|
|
* @throws IOException
|
|
* @throws IOException
|
|
* @see #put(long, Text, byte[])
|
|
* @see #put(long, Text, byte[])
|
|
*/
|
|
*/
|
|
public long startUpdate(Text row) throws IOException {
|
|
public long startUpdate(Text row) throws IOException {
|
|
- // We obtain a per-row lock, so other clients will block while one client
|
|
|
|
- // performs an update. The read lock is released by the client calling
|
|
|
|
- // #commit or #abort or if the HRegionServer lease on the lock expires.
|
|
|
|
- // See HRegionServer#RegionListener for how the expire on HRegionServer
|
|
|
|
- // invokes a HRegion#abort.
|
|
|
|
- return obtainRowLock(row);
|
|
|
|
|
|
+ // Do a rough check that we have resources to accept a write. The check is
|
|
|
|
+ // 'rough' in that between the resource check and the call to obtain a
|
|
|
|
+ // read lock, resources may run out. For now, the thought is that this
|
|
|
|
+ // will be extremely rare; we'll deal with it when it happens.
|
|
|
|
+ checkResources();
|
|
|
|
+
|
|
|
|
+ if (this.closed.get()) {
|
|
|
|
+ throw new IOException("Region " + this.getRegionName().toString() +
|
|
|
|
+ " closed");
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ // Get a read lock. We will not be able to get one if we are closing or
|
|
|
|
+ // if this region is being split. In neither case should we be allowing
|
|
|
|
+ // updates.
|
|
|
|
+ this.lock.obtainReadLock();
|
|
|
|
+ try {
|
|
|
|
+ // We obtain a per-row lock, so other clients will block while one client
|
|
|
|
+ // performs an update. The read lock is released by the client calling
|
|
|
|
+ // #commit or #abort or if the HRegionServer lease on the lock expires.
|
|
|
|
+ // See HRegionServer#RegionListener for how the expire on HRegionServer
|
|
|
|
+ // invokes a HRegion#abort.
|
|
|
|
+ return obtainRowLock(row);
|
|
|
|
+ } finally {
|
|
|
|
+ this.lock.releaseReadLock();
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ /*
|
|
|
|
+ * Check if resources to support an update.
|
|
|
|
+ *
|
|
|
|
+ * For now, just checks memcache saturation.
|
|
|
|
+ *
|
|
|
|
+ * Here we synchronize on HRegion, a broad scoped lock. Its appropriate
|
|
|
|
+ * given we're figuring in here whether this region is able to take on
|
|
|
|
+ * writes. This is only method with a synchronize (at time of writing),
|
|
|
|
+ * this and the synchronize on 'this' inside in internalFlushCache to send
|
|
|
|
+ * the notify.
|
|
|
|
+ */
|
|
|
|
+ private synchronized void checkResources() {
|
|
|
|
+ if (checkCommitsSinceFlush()) {
|
|
|
|
+ return;
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ LOG.warn("Blocking updates for '" + Thread.currentThread().getName() +
|
|
|
|
+ "': Memcache size " +
|
|
|
|
+ StringUtils.humanReadableInt(this.memcache.getSize()) +
|
|
|
|
+ " is >= than blocking " +
|
|
|
|
+ StringUtils.humanReadableInt(this.blockingMemcacheSize) + " size");
|
|
|
|
+ while (!checkCommitsSinceFlush()) {
|
|
|
|
+ try {
|
|
|
|
+ wait();
|
|
|
|
+ } catch (InterruptedException e) {
|
|
|
|
+ // continue;
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+ LOG.warn("Unblocking updates for '" + Thread.currentThread().getName() +
|
|
|
|
+ "'");
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ /*
|
|
|
|
+ * @return True if commits since flush is under the blocking threshold.
|
|
|
|
+ */
|
|
|
|
+ private boolean checkCommitsSinceFlush() {
|
|
|
|
+ return this.memcache.getSize() < this.blockingMemcacheSize;
|
|
}
|
|
}
|
|
|
|
|
|
/**
|
|
/**
|
|
@@ -1142,11 +1234,11 @@ public class HRegion implements HConstants {
|
|
throw new LockException("Locking error: put operation on lock " +
|
|
throw new LockException("Locking error: put operation on lock " +
|
|
lockid + " unexpected aborted by another thread");
|
|
lockid + " unexpected aborted by another thread");
|
|
}
|
|
}
|
|
-
|
|
|
|
- TreeMap<Text, byte []> targets = this.targetColumns.get(lockid);
|
|
|
|
|
|
+ Long lid = Long.valueOf(lockid);
|
|
|
|
+ TreeMap<Text, byte []> targets = this.targetColumns.get(lid);
|
|
if (targets == null) {
|
|
if (targets == null) {
|
|
targets = new TreeMap<Text, byte []>();
|
|
targets = new TreeMap<Text, byte []>();
|
|
- this.targetColumns.put(lockid, targets);
|
|
|
|
|
|
+ this.targetColumns.put(lid, targets);
|
|
}
|
|
}
|
|
targets.put(targetCol, val);
|
|
targets.put(targetCol, val);
|
|
}
|
|
}
|
|
@@ -1207,18 +1299,17 @@ public class HRegion implements HConstants {
|
|
// hasn't aborted/committed the write-operation
|
|
// hasn't aborted/committed the write-operation
|
|
synchronized(row) {
|
|
synchronized(row) {
|
|
// Add updates to the log and add values to the memcache.
|
|
// Add updates to the log and add values to the memcache.
|
|
- TreeMap<Text, byte []> columns = this.targetColumns.get(lockid);
|
|
|
|
|
|
+ Long lid = Long.valueOf(lockid);
|
|
|
|
+ TreeMap<Text, byte []> columns = this.targetColumns.get(lid);
|
|
if (columns != null && columns.size() > 0) {
|
|
if (columns != null && columns.size() > 0) {
|
|
log.append(regionInfo.regionName, regionInfo.tableDesc.getName(),
|
|
log.append(regionInfo.regionName, regionInfo.tableDesc.getName(),
|
|
row, columns, timestamp);
|
|
row, columns, timestamp);
|
|
memcache.add(row, columns, timestamp);
|
|
memcache.add(row, columns, timestamp);
|
|
// OK, all done!
|
|
// OK, all done!
|
|
}
|
|
}
|
|
- targetColumns.remove(lockid);
|
|
|
|
|
|
+ targetColumns.remove(lid);
|
|
releaseRowLock(row);
|
|
releaseRowLock(row);
|
|
}
|
|
}
|
|
- recentCommits++;
|
|
|
|
- this.commitsSinceFlush++;
|
|
|
|
}
|
|
}
|
|
|
|
|
|
//////////////////////////////////////////////////////////////////////////////
|
|
//////////////////////////////////////////////////////////////////////////////
|
|
@@ -1284,11 +1375,11 @@ public class HRegion implements HConstants {
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
|
|
- long lockid = Math.abs(rand.nextLong());
|
|
|
|
- rowsToLocks.put(row, lockid);
|
|
|
|
- locksToRows.put(lockid, row);
|
|
|
|
|
|
+ Long lid = Long.valueOf(Math.abs(rand.nextLong()));
|
|
|
|
+ rowsToLocks.put(row, lid);
|
|
|
|
+ locksToRows.put(lid, row);
|
|
rowsToLocks.notifyAll();
|
|
rowsToLocks.notifyAll();
|
|
- return lockid;
|
|
|
|
|
|
+ return lid.longValue();
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
|
|
@@ -1451,7 +1542,6 @@ public class HRegion implements HConstants {
|
|
// If we are doing a wild card match or there are multiple
|
|
// If we are doing a wild card match or there are multiple
|
|
// matchers per column, we need to scan all the older versions of
|
|
// matchers per column, we need to scan all the older versions of
|
|
// this row to pick up the rest of the family members
|
|
// this row to pick up the rest of the family members
|
|
-
|
|
|
|
if (!wildcardMatch
|
|
if (!wildcardMatch
|
|
&& !multipleMatchers
|
|
&& !multipleMatchers
|
|
&& (keys[i].getTimestamp() != chosenTimestamp)) {
|
|
&& (keys[i].getTimestamp() != chosenTimestamp)) {
|
|
@@ -1467,7 +1557,6 @@ public class HRegion implements HConstants {
|
|
// but this had the effect of overwriting newer
|
|
// but this had the effect of overwriting newer
|
|
// values with older ones. So now we only insert
|
|
// values with older ones. So now we only insert
|
|
// a result if the map does not contain the key.
|
|
// a result if the map does not contain the key.
|
|
-
|
|
|
|
for (Map.Entry<Text, byte[]> e : resultSets[i].entrySet()) {
|
|
for (Map.Entry<Text, byte[]> e : resultSets[i].entrySet()) {
|
|
if (!filtered && moreToFollow &&
|
|
if (!filtered && moreToFollow &&
|
|
!results.containsKey(e.getKey())) {
|
|
!results.containsKey(e.getKey())) {
|
|
@@ -1516,7 +1605,9 @@ public class HRegion implements HConstants {
|
|
}
|
|
}
|
|
}
|
|
}
|
|
if (LOG.isDebugEnabled()) {
|
|
if (LOG.isDebugEnabled()) {
|
|
- LOG.debug("ROWKEY = " + chosenRow + ", FILTERED = " + filtered);
|
|
|
|
|
|
+ if (this.dataFilter != null) {
|
|
|
|
+ LOG.debug("ROWKEY = " + chosenRow + ", FILTERED = " + filtered);
|
|
|
|
+ }
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
|
|
@@ -1640,7 +1731,10 @@ public class HRegion implements HConstants {
|
|
client.put(lockid, COL_STARTCODE,
|
|
client.put(lockid, COL_STARTCODE,
|
|
String.valueOf(startCode).getBytes(UTF8_ENCODING));
|
|
String.valueOf(startCode).getBytes(UTF8_ENCODING));
|
|
client.commit(lockid);
|
|
client.commit(lockid);
|
|
- LOG.info("Added region " + region.getRegionName() + " to table " + table);
|
|
|
|
|
|
+ if (LOG.isDebugEnabled()) {
|
|
|
|
+ LOG.info("Added region " + region.getRegionName() + " to table " +
|
|
|
|
+ table);
|
|
|
|
+ }
|
|
}
|
|
}
|
|
|
|
|
|
/**
|
|
/**
|
|
@@ -1659,7 +1753,9 @@ public class HRegion implements HConstants {
|
|
client.delete(lockid, COL_SERVER);
|
|
client.delete(lockid, COL_SERVER);
|
|
client.delete(lockid, COL_STARTCODE);
|
|
client.delete(lockid, COL_STARTCODE);
|
|
client.commit(lockid);
|
|
client.commit(lockid);
|
|
- LOG.info("Removed " + regionName + " from table " + table);
|
|
|
|
|
|
+ if (LOG.isDebugEnabled()) {
|
|
|
|
+ LOG.debug("Removed " + regionName + " from table " + table);
|
|
|
|
+ }
|
|
}
|
|
}
|
|
|
|
|
|
/**
|
|
/**
|