|
@@ -217,77 +217,49 @@ public class HRegionServer implements HConstants, HRegionInterface, Runnable {
|
|
|
this.expirationTime = expirationTime;
|
|
|
}
|
|
|
}
|
|
|
-
|
|
|
- // Check to see if regions should be split
|
|
|
- final Splitter splitter;
|
|
|
- // Needed at shutdown. On way out, if can get this lock then we are not in
|
|
|
- // middle of a split or compaction: i.e. splits/compactions cannot be
|
|
|
- // interrupted.
|
|
|
- final Integer splitterLock = new Integer(0);
|
|
|
-
|
|
|
- /** Split regions on request */
|
|
|
- class Splitter extends Thread implements RegionUnavailableListener {
|
|
|
- private final BlockingQueue<QueueEntry> splitQueue =
|
|
|
- new LinkedBlockingQueue<QueueEntry>();
|
|
|
|
|
|
+ // Compactions
|
|
|
+ final CompactSplitThread compactSplitThread;
|
|
|
+ // Needed during shutdown so we send an interrupt after completion of a
|
|
|
+ // compaction, not in the midst.
|
|
|
+ final Integer compactSplitLock = new Integer(0);
|
|
|
+
|
|
|
+ /** Compact region on request and then run split if appropriate
|
|
|
+ */
|
|
|
+ private class CompactSplitThread extends Thread
|
|
|
+ implements RegionUnavailableListener {
|
|
|
private HTable root = null;
|
|
|
private HTable meta = null;
|
|
|
private long startTime;
|
|
|
+ private final long frequency;
|
|
|
+
|
|
|
+ private final BlockingQueue<QueueEntry> compactionQueue =
|
|
|
+ new LinkedBlockingQueue<QueueEntry>();
|
|
|
|
|
|
/** constructor */
|
|
|
- public Splitter() {
|
|
|
+ public CompactSplitThread() {
|
|
|
super();
|
|
|
- }
|
|
|
-
|
|
|
- /** {@inheritDoc} */
|
|
|
- public void closing(final Text regionName) {
|
|
|
- startTime = System.currentTimeMillis();
|
|
|
- lock.writeLock().lock();
|
|
|
- try {
|
|
|
- // Remove region from regions Map and add it to the Map of retiring
|
|
|
- // regions.
|
|
|
- retiringRegions.put(regionName, onlineRegions.remove(regionName));
|
|
|
- if (LOG.isDebugEnabled()) {
|
|
|
- LOG.debug(regionName.toString() + " closing (" +
|
|
|
- "Adding to retiringRegions)");
|
|
|
- }
|
|
|
- } finally {
|
|
|
- lock.writeLock().unlock();
|
|
|
- }
|
|
|
+ this.frequency =
|
|
|
+ conf.getLong("hbase.regionserver.thread.splitcompactcheckfrequency",
|
|
|
+ 20 * 1000);
|
|
|
}
|
|
|
|
|
|
/** {@inheritDoc} */
|
|
|
- public void closed(final Text regionName) {
|
|
|
- lock.writeLock().lock();
|
|
|
- try {
|
|
|
- retiringRegions.remove(regionName);
|
|
|
- if (LOG.isDebugEnabled()) {
|
|
|
- LOG.debug(regionName.toString() + " closed");
|
|
|
- }
|
|
|
- } finally {
|
|
|
- lock.writeLock().unlock();
|
|
|
- }
|
|
|
- }
|
|
|
-
|
|
|
- /**
|
|
|
- * Perform region splits if necessary
|
|
|
- */
|
|
|
@Override
|
|
|
public void run() {
|
|
|
while (!stopRequested.get()) {
|
|
|
QueueEntry e = null;
|
|
|
try {
|
|
|
- e = splitQueue.poll(threadWakeFrequency, TimeUnit.MILLISECONDS);
|
|
|
+ e = compactionQueue.poll(this.frequency, TimeUnit.MILLISECONDS);
|
|
|
if (e == null) {
|
|
|
continue;
|
|
|
}
|
|
|
- synchronized (splitterLock) { // Don't interrupt us while we're working
|
|
|
- split(e.getRegion());
|
|
|
- }
|
|
|
+ e.getRegion().compactIfNeeded();
|
|
|
+ split(e.getRegion());
|
|
|
} catch (InterruptedException ex) {
|
|
|
continue;
|
|
|
} catch (IOException ex) {
|
|
|
- LOG.error("Split failed" +
|
|
|
+ LOG.error("Compaction failed" +
|
|
|
(e != null ? (" for region " + e.getRegion().getRegionName()) : ""),
|
|
|
RemoteExceptionHandler.checkIOException(ex));
|
|
|
if (!checkFileSystem()) {
|
|
@@ -295,7 +267,7 @@ public class HRegionServer implements HConstants, HRegionInterface, Runnable {
|
|
|
}
|
|
|
|
|
|
} catch (Exception ex) {
|
|
|
- LOG.error("Split failed" +
|
|
|
+ LOG.error("Compaction failed" +
|
|
|
(e != null ? (" for region " + e.getRegion().getRegionName()) : ""),
|
|
|
ex);
|
|
|
if (!checkFileSystem()) {
|
|
@@ -307,18 +279,22 @@ public class HRegionServer implements HConstants, HRegionInterface, Runnable {
|
|
|
}
|
|
|
|
|
|
/**
|
|
|
- * @param e entry indicating which region needs to be split
|
|
|
+ * @param e QueueEntry for region to be compacted
|
|
|
*/
|
|
|
- public void splitRequested(QueueEntry e) {
|
|
|
- splitQueue.add(e);
|
|
|
+ public void compactionRequested(QueueEntry e) {
|
|
|
+ compactionQueue.add(e);
|
|
|
+ }
|
|
|
+
|
|
|
+ void compactionRequested(final HRegion r) {
|
|
|
+ compactionRequested(new QueueEntry(r, System.currentTimeMillis()));
|
|
|
}
|
|
|
|
|
|
private void split(final HRegion region) throws IOException {
|
|
|
final HRegionInfo oldRegionInfo = region.getRegionInfo();
|
|
|
final HRegion[] newRegions = region.splitRegion(this);
|
|
|
-
|
|
|
if (newRegions == null) {
|
|
|
- return; // Didn't need to be split
|
|
|
+ // Didn't need to be split
|
|
|
+ return;
|
|
|
}
|
|
|
|
|
|
// When a region is split, the META table needs to updated if we're
|
|
@@ -374,65 +350,35 @@ public class HRegionServer implements HConstants, HRegionInterface, Runnable {
|
|
|
|
|
|
// Do not serve the new regions. Let the Master assign them.
|
|
|
}
|
|
|
- }
|
|
|
-
|
|
|
- // Compactions
|
|
|
- final Compactor compactor;
|
|
|
- // Needed during shutdown so we send an interrupt after completion of a
|
|
|
- // compaction, not in the midst.
|
|
|
- final Integer compactionLock = new Integer(0);
|
|
|
-
|
|
|
- /** Compact region on request */
|
|
|
- class Compactor extends Thread {
|
|
|
- private final BlockingQueue<QueueEntry> compactionQueue =
|
|
|
- new LinkedBlockingQueue<QueueEntry>();
|
|
|
-
|
|
|
- /** constructor */
|
|
|
- public Compactor() {
|
|
|
- super();
|
|
|
- }
|
|
|
|
|
|
/** {@inheritDoc} */
|
|
|
- @Override
|
|
|
- public void run() {
|
|
|
- while (!stopRequested.get()) {
|
|
|
- QueueEntry e = null;
|
|
|
- try {
|
|
|
- e = compactionQueue.poll(threadWakeFrequency, TimeUnit.MILLISECONDS);
|
|
|
- if (e == null) {
|
|
|
- continue;
|
|
|
- }
|
|
|
- if (e.getRegion().compactIfNeeded()) {
|
|
|
- splitter.splitRequested(e);
|
|
|
- }
|
|
|
-
|
|
|
- } catch (InterruptedException ex) {
|
|
|
- continue;
|
|
|
- } catch (IOException ex) {
|
|
|
- LOG.error("Compaction failed" +
|
|
|
- (e != null ? (" for region " + e.getRegion().getRegionName()) : ""),
|
|
|
- RemoteExceptionHandler.checkIOException(ex));
|
|
|
- if (!checkFileSystem()) {
|
|
|
- break;
|
|
|
- }
|
|
|
-
|
|
|
- } catch (Exception ex) {
|
|
|
- LOG.error("Compaction failed" +
|
|
|
- (e != null ? (" for region " + e.getRegion().getRegionName()) : ""),
|
|
|
- ex);
|
|
|
- if (!checkFileSystem()) {
|
|
|
- break;
|
|
|
- }
|
|
|
+ public void closing(final Text regionName) {
|
|
|
+ startTime = System.currentTimeMillis();
|
|
|
+ lock.writeLock().lock();
|
|
|
+ try {
|
|
|
+ // Remove region from regions Map and add it to the Map of retiring
|
|
|
+ // regions.
|
|
|
+ retiringRegions.put(regionName, onlineRegions.remove(regionName));
|
|
|
+ if (LOG.isDebugEnabled()) {
|
|
|
+ LOG.debug(regionName.toString() + " closing (" +
|
|
|
+ "Adding to retiringRegions)");
|
|
|
}
|
|
|
+ } finally {
|
|
|
+ lock.writeLock().unlock();
|
|
|
}
|
|
|
- LOG.info(getName() + " exiting");
|
|
|
}
|
|
|
|
|
|
- /**
|
|
|
- * @param e QueueEntry for region to be compacted
|
|
|
- */
|
|
|
- public void compactionRequested(QueueEntry e) {
|
|
|
- compactionQueue.add(e);
|
|
|
+ /** {@inheritDoc} */
|
|
|
+ public void closed(final Text regionName) {
|
|
|
+ lock.writeLock().lock();
|
|
|
+ try {
|
|
|
+ retiringRegions.remove(regionName);
|
|
|
+ if (LOG.isDebugEnabled()) {
|
|
|
+ LOG.debug(regionName.toString() + " closed");
|
|
|
+ }
|
|
|
+ } finally {
|
|
|
+ lock.writeLock().unlock();
|
|
|
+ }
|
|
|
}
|
|
|
}
|
|
|
|
|
@@ -469,7 +415,7 @@ public class HRegionServer implements HConstants, HRegionInterface, Runnable {
|
|
|
}
|
|
|
synchronized(cacheFlusherLock) { // Don't interrupt while we're working
|
|
|
if (e.getRegion().flushcache()) {
|
|
|
- compactor.compactionRequested(e);
|
|
|
+ compactSplitThread.compactionRequested(e);
|
|
|
}
|
|
|
|
|
|
e.setExpirationTime(System.currentTimeMillis() +
|
|
@@ -650,10 +596,7 @@ public class HRegionServer implements HConstants, HRegionInterface, Runnable {
|
|
|
this.cacheFlusher = new Flusher();
|
|
|
|
|
|
// Compaction thread
|
|
|
- this.compactor = new Compactor();
|
|
|
-
|
|
|
- // Region split thread
|
|
|
- this.splitter = new Splitter();
|
|
|
+ this.compactSplitThread = new CompactSplitThread();
|
|
|
|
|
|
// Log rolling thread
|
|
|
this.logRoller = new LogRoller();
|
|
@@ -846,11 +789,8 @@ public class HRegionServer implements HConstants, HRegionInterface, Runnable {
|
|
|
synchronized(cacheFlusherLock) {
|
|
|
this.cacheFlusher.interrupt();
|
|
|
}
|
|
|
- synchronized (compactionLock) {
|
|
|
- this.compactor.interrupt();
|
|
|
- }
|
|
|
- synchronized (splitterLock) {
|
|
|
- this.splitter.interrupt();
|
|
|
+ synchronized (compactSplitLock) {
|
|
|
+ this.compactSplitThread.interrupt();
|
|
|
}
|
|
|
synchronized (logRollerLock) {
|
|
|
this.logRoller.interrupt();
|
|
@@ -972,9 +912,8 @@ public class HRegionServer implements HConstants, HRegionInterface, Runnable {
|
|
|
handler);
|
|
|
Threads.setDaemonThreadRunning(this.cacheFlusher, n + ".cacheFlusher",
|
|
|
handler);
|
|
|
- Threads.setDaemonThreadRunning(this.compactor, n + ".compactor",
|
|
|
+ Threads.setDaemonThreadRunning(this.compactSplitThread, n + ".compactor",
|
|
|
handler);
|
|
|
- Threads.setDaemonThreadRunning(this.splitter, n + ".splitter", handler);
|
|
|
Threads.setDaemonThreadRunning(this.workerThread, n + ".worker", handler);
|
|
|
// Leases is not a Thread. Internally it runs a daemon thread. If it gets
|
|
|
// an unhandled exception, it will just exit.
|
|
@@ -1038,8 +977,7 @@ public class HRegionServer implements HConstants, HRegionInterface, Runnable {
|
|
|
void join() {
|
|
|
join(this.workerThread);
|
|
|
join(this.cacheFlusher);
|
|
|
- join(this.compactor);
|
|
|
- join(this.splitter);
|
|
|
+ join(this.compactSplitThread);
|
|
|
join(this.logRoller);
|
|
|
}
|
|
|
|
|
@@ -1219,7 +1157,8 @@ public class HRegionServer implements HConstants, HRegionInterface, Runnable {
|
|
|
),
|
|
|
this.log, this.fs, conf, regionInfo, null, this.cacheFlusher
|
|
|
);
|
|
|
-
|
|
|
+ // Startup a compaction early if one is needed.
|
|
|
+ this.compactSplitThread.compactionRequested(region);
|
|
|
} catch (IOException e) {
|
|
|
LOG.error("error opening region " + regionInfo.getRegionName(), e);
|
|
|
|