|
@@ -25,8 +25,6 @@ import org.apache.hadoop.conf.*;
|
|
|
|
|
|
import java.io.*;
|
|
|
import java.util.*;
|
|
|
-import java.util.concurrent.locks.ReadWriteLock;
|
|
|
-import java.util.concurrent.locks.ReentrantReadWriteLock;
|
|
|
|
|
|
/*******************************************************************************
|
|
|
* HRegionServer makes a set of HRegions available to clients. It checks in with
|
|
@@ -52,7 +50,7 @@ public class HRegionServer
|
|
|
private Configuration conf;
|
|
|
private Random rand;
|
|
|
private TreeMap<Text, HRegion> regions; // region name -> HRegion
|
|
|
- private ReadWriteLock locker;
|
|
|
+ private HLocking lock;
|
|
|
private Vector<HMsg> outboundMsgs;
|
|
|
|
|
|
private long threadWakeFrequency;
|
|
@@ -61,11 +59,12 @@ public class HRegionServer
|
|
|
|
|
|
// Check to see if regions should be split
|
|
|
|
|
|
- private long splitCheckFrequency;
|
|
|
- private SplitChecker splitChecker;
|
|
|
- private Thread splitCheckerThread;
|
|
|
+ private long splitOrCompactCheckFrequency;
|
|
|
+ private SplitOrCompactChecker splitOrCompactChecker;
|
|
|
+ private Thread splitOrCompactCheckerThread;
|
|
|
+ private Integer splitOrCompactLock = new Integer(0);
|
|
|
|
|
|
- private class SplitChecker implements Runnable {
|
|
|
+ private class SplitOrCompactChecker implements Runnable {
|
|
|
private HClient client = new HClient(conf);
|
|
|
|
|
|
private class SplitRegion {
|
|
@@ -82,116 +81,122 @@ public class HRegionServer
|
|
|
while(! stopRequested) {
|
|
|
long startTime = System.currentTimeMillis();
|
|
|
|
|
|
- // Grab a list of regions to check
|
|
|
- Vector<HRegion> checkSplit = new Vector<HRegion>();
|
|
|
- locker.readLock().lock();
|
|
|
- try {
|
|
|
- checkSplit.addAll(regions.values());
|
|
|
- } finally {
|
|
|
- locker.readLock().unlock();
|
|
|
- }
|
|
|
+ synchronized(splitOrCompactLock) {
|
|
|
|
|
|
- // Check to see if they need splitting
|
|
|
+ // Grab a list of regions to check
|
|
|
|
|
|
- Vector<SplitRegion> toSplit = new Vector<SplitRegion>();
|
|
|
- for(Iterator<HRegion> it = checkSplit.iterator(); it.hasNext(); ) {
|
|
|
- HRegion cur = it.next();
|
|
|
- Text midKey = new Text();
|
|
|
-
|
|
|
+ Vector<HRegion> regionsToCheck = new Vector<HRegion>();
|
|
|
+ lock.obtainReadLock();
|
|
|
try {
|
|
|
- if(cur.needsSplit(midKey)) {
|
|
|
+ regionsToCheck.addAll(regions.values());
|
|
|
+ } finally {
|
|
|
+ lock.releaseReadLock();
|
|
|
+ }
|
|
|
+
|
|
|
+ // Check to see if they need splitting or compacting
|
|
|
+
|
|
|
+ Vector<SplitRegion> toSplit = new Vector<SplitRegion>();
|
|
|
+ Vector<HRegion> toCompact = new Vector<HRegion>();
|
|
|
+ for(Iterator<HRegion> it = regionsToCheck.iterator(); it.hasNext(); ) {
|
|
|
+ HRegion cur = it.next();
|
|
|
+ Text midKey = new Text();
|
|
|
+
|
|
|
+ if(cur.needsCompaction()) {
|
|
|
+ toCompact.add(cur);
|
|
|
+
|
|
|
+ } else if(cur.needsSplit(midKey)) {
|
|
|
toSplit.add(new SplitRegion(cur, midKey));
|
|
|
}
|
|
|
-
|
|
|
- } catch(IOException iex) {
|
|
|
- iex.printStackTrace();
|
|
|
}
|
|
|
- }
|
|
|
|
|
|
- for(Iterator<SplitRegion> it = toSplit.iterator(); it.hasNext(); ) {
|
|
|
- SplitRegion r = it.next();
|
|
|
-
|
|
|
- locker.writeLock().lock();
|
|
|
- regions.remove(r.region.getRegionName());
|
|
|
- locker.writeLock().unlock();
|
|
|
-
|
|
|
- HRegion[] newRegions = null;
|
|
|
try {
|
|
|
- Text oldRegion = r.region.getRegionName();
|
|
|
-
|
|
|
- LOG.info("splitting region: " + oldRegion);
|
|
|
-
|
|
|
- newRegions = r.region.closeAndSplit(r.midKey);
|
|
|
+ for(Iterator<HRegion>it = toCompact.iterator(); it.hasNext(); ) {
|
|
|
+ it.next().compactStores();
|
|
|
+ }
|
|
|
|
|
|
- // When a region is split, the META table needs to updated if we're
|
|
|
- // splitting a 'normal' region, and the ROOT table needs to be
|
|
|
- // updated if we are splitting a META region.
|
|
|
+ for(Iterator<SplitRegion> it = toSplit.iterator(); it.hasNext(); ) {
|
|
|
+ SplitRegion r = it.next();
|
|
|
|
|
|
- Text tableToUpdate
|
|
|
- = (oldRegion.find(META_TABLE_NAME.toString()) == 0)
|
|
|
- ? ROOT_TABLE_NAME : META_TABLE_NAME;
|
|
|
+ lock.obtainWriteLock();
|
|
|
+ regions.remove(r.region.getRegionName());
|
|
|
+ lock.releaseWriteLock();
|
|
|
|
|
|
- if(LOG.isDebugEnabled()) {
|
|
|
- LOG.debug("region split complete. updating meta");
|
|
|
- }
|
|
|
-
|
|
|
- client.openTable(tableToUpdate);
|
|
|
- long lockid = client.startUpdate(oldRegion);
|
|
|
- client.delete(lockid, COL_REGIONINFO);
|
|
|
- client.delete(lockid, COL_SERVER);
|
|
|
- client.delete(lockid, COL_STARTCODE);
|
|
|
- client.commit(lockid);
|
|
|
-
|
|
|
- for(int i = 0; i < newRegions.length; i++) {
|
|
|
- ByteArrayOutputStream bytes = new ByteArrayOutputStream();
|
|
|
- DataOutputStream out = new DataOutputStream(bytes);
|
|
|
- newRegions[i].getRegionInfo().write(out);
|
|
|
-
|
|
|
- lockid = client.startUpdate(newRegions[i].getRegionName());
|
|
|
- client.put(lockid, COL_REGIONINFO, bytes.toByteArray());
|
|
|
+ HRegion[] newRegions = null;
|
|
|
+ Text oldRegion = r.region.getRegionName();
|
|
|
+
|
|
|
+ LOG.info("splitting region: " + oldRegion);
|
|
|
+
|
|
|
+ newRegions = r.region.closeAndSplit(r.midKey);
|
|
|
+
|
|
|
+ // When a region is split, the META table needs to updated if we're
|
|
|
+ // splitting a 'normal' region, and the ROOT table needs to be
|
|
|
+ // updated if we are splitting a META region.
|
|
|
+
|
|
|
+ Text tableToUpdate =
|
|
|
+ (oldRegion.find(META_TABLE_NAME.toString()) == 0) ?
|
|
|
+ ROOT_TABLE_NAME : META_TABLE_NAME;
|
|
|
+
|
|
|
+ if(LOG.isDebugEnabled()) {
|
|
|
+ LOG.debug("region split complete. updating meta");
|
|
|
+ }
|
|
|
+
|
|
|
+ client.openTable(tableToUpdate);
|
|
|
+ long lockid = client.startUpdate(oldRegion);
|
|
|
+ client.delete(lockid, COL_REGIONINFO);
|
|
|
+ client.delete(lockid, COL_SERVER);
|
|
|
+ client.delete(lockid, COL_STARTCODE);
|
|
|
client.commit(lockid);
|
|
|
+
|
|
|
+ for(int i = 0; i < newRegions.length; i++) {
|
|
|
+ ByteArrayOutputStream bytes = new ByteArrayOutputStream();
|
|
|
+ DataOutputStream out = new DataOutputStream(bytes);
|
|
|
+ newRegions[i].getRegionInfo().write(out);
|
|
|
+
|
|
|
+ lockid = client.startUpdate(newRegions[i].getRegionName());
|
|
|
+ client.put(lockid, COL_REGIONINFO, bytes.toByteArray());
|
|
|
+ client.commit(lockid);
|
|
|
+ }
|
|
|
+
|
|
|
+ // Now tell the master about the new regions
|
|
|
+
|
|
|
+ if(LOG.isDebugEnabled()) {
|
|
|
+ LOG.debug("reporting region split to master");
|
|
|
+ }
|
|
|
+
|
|
|
+ reportSplit(newRegions[0].getRegionInfo(), newRegions[1].getRegionInfo());
|
|
|
+
|
|
|
+ LOG.info("region split successful. old region=" + oldRegion
|
|
|
+ + ", new regions: " + newRegions[0].getRegionName() + ", "
|
|
|
+ + newRegions[1].getRegionName());
|
|
|
+
|
|
|
+ newRegions[0].close();
|
|
|
+ newRegions[1].close();
|
|
|
+
|
|
|
}
|
|
|
-
|
|
|
- // Now tell the master about the new regions
|
|
|
-
|
|
|
- if(LOG.isDebugEnabled()) {
|
|
|
- LOG.debug("reporting region split to master");
|
|
|
- }
|
|
|
-
|
|
|
- reportSplit(newRegions[0].getRegionInfo(), newRegions[1].getRegionInfo());
|
|
|
-
|
|
|
- LOG.info("region split successful. old region=" + oldRegion
|
|
|
- + ", new regions: " + newRegions[0].getRegionName() + ", "
|
|
|
- + newRegions[1].getRegionName());
|
|
|
-
|
|
|
- newRegions[0].close();
|
|
|
- newRegions[1].close();
|
|
|
-
|
|
|
} catch(IOException e) {
|
|
|
//TODO: What happens if this fails? Are we toast?
|
|
|
- e.printStackTrace();
|
|
|
- continue;
|
|
|
+ LOG.error(e);
|
|
|
}
|
|
|
}
|
|
|
|
|
|
// Sleep
|
|
|
long waitTime = stopRequested ? 0
|
|
|
- : splitCheckFrequency - (System.currentTimeMillis() - startTime);
|
|
|
+ : splitOrCompactCheckFrequency - (System.currentTimeMillis() - startTime);
|
|
|
if (waitTime > 0) {
|
|
|
try {
|
|
|
if (LOG.isDebugEnabled()) {
|
|
|
- LOG.debug("Sleep splitChecker");
|
|
|
+ LOG.debug("Sleep splitOrCompactChecker");
|
|
|
}
|
|
|
Thread.sleep(waitTime);
|
|
|
if (LOG.isDebugEnabled()) {
|
|
|
- LOG.debug("Wake splitChecker");
|
|
|
+ LOG.debug("Wake splitOrCompactChecker");
|
|
|
}
|
|
|
} catch(InterruptedException iex) {
|
|
|
}
|
|
|
}
|
|
|
}
|
|
|
if(LOG.isDebugEnabled()) {
|
|
|
- LOG.debug("splitChecker exiting");
|
|
|
+ LOG.debug("splitOrCompactChecker exiting");
|
|
|
}
|
|
|
}
|
|
|
}
|
|
@@ -200,35 +205,39 @@ public class HRegionServer
|
|
|
|
|
|
private Flusher cacheFlusher;
|
|
|
private Thread cacheFlusherThread;
|
|
|
+ private Integer cacheFlusherLock = new Integer(0);
|
|
|
private class Flusher implements Runnable {
|
|
|
public void run() {
|
|
|
while(! stopRequested) {
|
|
|
long startTime = System.currentTimeMillis();
|
|
|
|
|
|
- // Grab a list of items to flush
|
|
|
-
|
|
|
- Vector<HRegion> toFlush = new Vector<HRegion>();
|
|
|
- locker.readLock().lock();
|
|
|
- try {
|
|
|
- toFlush.addAll(regions.values());
|
|
|
-
|
|
|
- } finally {
|
|
|
- locker.readLock().unlock();
|
|
|
- }
|
|
|
+ synchronized(cacheFlusherLock) {
|
|
|
|
|
|
- // Flush them, if necessary
|
|
|
+ // Grab a list of items to flush
|
|
|
|
|
|
- for(Iterator<HRegion> it = toFlush.iterator(); it.hasNext(); ) {
|
|
|
- HRegion cur = it.next();
|
|
|
-
|
|
|
+ Vector<HRegion> toFlush = new Vector<HRegion>();
|
|
|
+ lock.obtainReadLock();
|
|
|
try {
|
|
|
- cur.optionallyFlush();
|
|
|
-
|
|
|
- } catch(IOException iex) {
|
|
|
- iex.printStackTrace();
|
|
|
+ toFlush.addAll(regions.values());
|
|
|
+
|
|
|
+ } finally {
|
|
|
+ lock.releaseReadLock();
|
|
|
}
|
|
|
- }
|
|
|
|
|
|
+ // Flush them, if necessary
|
|
|
+
|
|
|
+ for(Iterator<HRegion> it = toFlush.iterator(); it.hasNext(); ) {
|
|
|
+ HRegion cur = it.next();
|
|
|
+
|
|
|
+ try {
|
|
|
+ cur.optionallyFlush();
|
|
|
+
|
|
|
+ } catch(IOException iex) {
|
|
|
+ iex.printStackTrace();
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
// Sleep
|
|
|
long waitTime = stopRequested ? 0
|
|
|
: threadWakeFrequency - (System.currentTimeMillis() - startTime);
|
|
@@ -262,18 +271,25 @@ public class HRegionServer
|
|
|
private HLog log;
|
|
|
private LogRoller logRoller;
|
|
|
private Thread logRollerThread;
|
|
|
+ private Integer logRollerLock = new Integer(0);
|
|
|
private class LogRoller implements Runnable {
|
|
|
public void run() {
|
|
|
while(! stopRequested) {
|
|
|
- // If the number of log entries is high enough, roll the log. This is a
|
|
|
- // very fast operation, but should not be done too frequently.
|
|
|
- if(log.getNumEntries() > maxLogEntries) {
|
|
|
- try {
|
|
|
- log.rollWriter();
|
|
|
- } catch(IOException iex) {
|
|
|
+ synchronized(logRollerLock) {
|
|
|
+ // If the number of log entries is high enough, roll the log. This is a
|
|
|
+ // very fast operation, but should not be done too frequently.
|
|
|
+ int nEntries = log.getNumEntries();
|
|
|
+ if(nEntries > maxLogEntries) {
|
|
|
+ try {
|
|
|
+ if (LOG.isDebugEnabled()) {
|
|
|
+ LOG.debug("Rolling log. Number of entries is: " + nEntries);
|
|
|
+ }
|
|
|
+ log.rollWriter();
|
|
|
+ } catch(IOException iex) {
|
|
|
+ }
|
|
|
}
|
|
|
}
|
|
|
-
|
|
|
+
|
|
|
if(!stopRequested) {
|
|
|
if (LOG.isDebugEnabled()) {
|
|
|
LOG.debug("Sleep logRoller");
|
|
@@ -323,7 +339,7 @@ public class HRegionServer
|
|
|
this.conf = conf;
|
|
|
this.rand = new Random();
|
|
|
this.regions = new TreeMap<Text, HRegion>();
|
|
|
- this.locker = new ReentrantReadWriteLock();
|
|
|
+ this.lock = new HLocking();
|
|
|
this.outboundMsgs = new Vector<HMsg>();
|
|
|
this.scanners =
|
|
|
Collections.synchronizedMap(new TreeMap<Text, HInternalScannerInterface>());
|
|
@@ -333,8 +349,8 @@ public class HRegionServer
|
|
|
this.maxLogEntries = conf.getInt("hbase.regionserver.maxlogentries", 30 * 1000);
|
|
|
this.msgInterval = conf.getLong("hbase.regionserver.msginterval",
|
|
|
15 * 1000);
|
|
|
- this.splitCheckFrequency =
|
|
|
- conf.getLong("hbase.regionserver.thread.splitcheckfrequency", 60 * 1000);
|
|
|
+ this.splitOrCompactCheckFrequency =
|
|
|
+ conf.getLong("hbase.regionserver.thread.splitcompactcheckfrequency", 60 * 1000);
|
|
|
|
|
|
// Cache flushing
|
|
|
this.cacheFlusher = new Flusher();
|
|
@@ -342,9 +358,9 @@ public class HRegionServer
|
|
|
new Thread(cacheFlusher, "HRegionServer.cacheFlusher");
|
|
|
|
|
|
// Check regions to see if they need to be split
|
|
|
- this.splitChecker = new SplitChecker();
|
|
|
- this.splitCheckerThread =
|
|
|
- new Thread(splitChecker, "HRegionServer.splitChecker");
|
|
|
+ this.splitOrCompactChecker = new SplitOrCompactChecker();
|
|
|
+ this.splitOrCompactCheckerThread =
|
|
|
+ new Thread(splitOrCompactChecker, "HRegionServer.splitOrCompactChecker");
|
|
|
|
|
|
// Process requests from Master
|
|
|
this.toDo = new Vector<HMsg>();
|
|
@@ -386,7 +402,7 @@ public class HRegionServer
|
|
|
// Threads
|
|
|
this.workerThread.start();
|
|
|
this.cacheFlusherThread.start();
|
|
|
- this.splitCheckerThread.start();
|
|
|
+ this.splitOrCompactCheckerThread.start();
|
|
|
this.logRollerThread.start();
|
|
|
this.leases = new Leases(conf.getLong("hbase.regionserver.lease.period",
|
|
|
3 * 60 * 1000), threadWakeFrequency);
|
|
@@ -429,14 +445,14 @@ public class HRegionServer
|
|
|
} catch(InterruptedException iex) {
|
|
|
}
|
|
|
try {
|
|
|
- this.splitCheckerThread.join();
|
|
|
+ this.splitOrCompactCheckerThread.join();
|
|
|
} catch(InterruptedException iex) {
|
|
|
}
|
|
|
try {
|
|
|
this.server.join();
|
|
|
} catch(InterruptedException iex) {
|
|
|
}
|
|
|
- LOG.info("server stopped at: " + address.toString());
|
|
|
+ LOG.info("HRegionServer stopped at: " + address.toString());
|
|
|
}
|
|
|
|
|
|
/**
|
|
@@ -568,9 +584,17 @@ public class HRegionServer
|
|
|
|
|
|
// Send interrupts to wake up threads if sleeping so they notice shutdown.
|
|
|
|
|
|
- this.logRollerThread.interrupt();
|
|
|
- this.cacheFlusherThread.interrupt();
|
|
|
- this.splitCheckerThread.interrupt();
|
|
|
+ synchronized(logRollerLock) {
|
|
|
+ this.logRollerThread.interrupt();
|
|
|
+ }
|
|
|
+
|
|
|
+ synchronized(cacheFlusherLock) {
|
|
|
+ this.cacheFlusherThread.interrupt();
|
|
|
+ }
|
|
|
+
|
|
|
+ synchronized(splitOrCompactLock) {
|
|
|
+ this.splitOrCompactCheckerThread.interrupt();
|
|
|
+ }
|
|
|
|
|
|
this.worker.stop();
|
|
|
this.server.stop();
|
|
@@ -721,7 +745,7 @@ public class HRegionServer
|
|
|
}
|
|
|
|
|
|
private void openRegion(HRegionInfo regionInfo) throws IOException {
|
|
|
- this.locker.writeLock().lock();
|
|
|
+ this.lock.obtainWriteLock();
|
|
|
try {
|
|
|
HRegion region =
|
|
|
new HRegion(regionDir, log, fs, conf, regionInfo, null, oldlogfile);
|
|
@@ -729,14 +753,14 @@ public class HRegionServer
|
|
|
reportOpen(region);
|
|
|
|
|
|
} finally {
|
|
|
- this.locker.writeLock().unlock();
|
|
|
+ this.lock.releaseWriteLock();
|
|
|
}
|
|
|
}
|
|
|
|
|
|
private void closeRegion(HRegionInfo info, boolean reportWhenCompleted)
|
|
|
throws IOException {
|
|
|
|
|
|
- this.locker.writeLock().lock();
|
|
|
+ this.lock.obtainWriteLock();
|
|
|
try {
|
|
|
HRegion region = regions.remove(info.regionName);
|
|
|
|
|
@@ -749,13 +773,13 @@ public class HRegionServer
|
|
|
}
|
|
|
|
|
|
} finally {
|
|
|
- this.locker.writeLock().unlock();
|
|
|
+ this.lock.releaseWriteLock();
|
|
|
}
|
|
|
}
|
|
|
|
|
|
private void closeAndDeleteRegion(HRegionInfo info) throws IOException {
|
|
|
|
|
|
- this.locker.writeLock().lock();
|
|
|
+ this.lock.obtainWriteLock();
|
|
|
try {
|
|
|
HRegion region = regions.remove(info.regionName);
|
|
|
|
|
@@ -764,13 +788,13 @@ public class HRegionServer
|
|
|
}
|
|
|
|
|
|
} finally {
|
|
|
- this.locker.writeLock().unlock();
|
|
|
+ this.lock.releaseWriteLock();
|
|
|
}
|
|
|
}
|
|
|
|
|
|
/** Called either when the master tells us to restart or from stop() */
|
|
|
private void closeAllRegions() {
|
|
|
- this.locker.writeLock().lock();
|
|
|
+ this.lock.obtainWriteLock();
|
|
|
try {
|
|
|
for(Iterator<HRegion> it = regions.values().iterator(); it.hasNext(); ) {
|
|
|
HRegion region = it.next();
|
|
@@ -787,7 +811,7 @@ public class HRegionServer
|
|
|
regions.clear();
|
|
|
|
|
|
} finally {
|
|
|
- this.locker.writeLock().unlock();
|
|
|
+ this.lock.releaseWriteLock();
|
|
|
}
|
|
|
}
|
|
|
|
|
@@ -1005,12 +1029,12 @@ public class HRegionServer
|
|
|
|
|
|
/** Private utility method for safely obtaining an HRegion handle. */
|
|
|
private HRegion getRegion(Text regionName) {
|
|
|
- this.locker.readLock().lock();
|
|
|
+ this.lock.obtainReadLock();
|
|
|
try {
|
|
|
return regions.get(regionName);
|
|
|
|
|
|
} finally {
|
|
|
- this.locker.readLock().unlock();
|
|
|
+ this.lock.releaseReadLock();
|
|
|
}
|
|
|
}
|
|
|
|