|
@@ -19,9 +19,10 @@ import java.io.IOException;
|
|
|
import java.util.ArrayList;
|
|
|
import java.util.Collections;
|
|
|
import java.util.HashMap;
|
|
|
-import java.util.Iterator;
|
|
|
+import java.util.LinkedList;
|
|
|
import java.util.Map;
|
|
|
import java.util.Random;
|
|
|
+import java.util.SortedMap;
|
|
|
import java.util.TreeMap;
|
|
|
import java.util.Vector;
|
|
|
import java.util.concurrent.locks.ReentrantReadWriteLock;
|
|
@@ -42,9 +43,11 @@ import org.apache.hadoop.util.StringUtils;
|
|
|
* HRegionServer makes a set of HRegions available to clients. It checks in with
|
|
|
* the HMaster. There are many HRegionServers in a single HBase deployment.
|
|
|
******************************************************************************/
|
|
|
-public class HRegionServer
|
|
|
- implements HConstants, HRegionInterface, Runnable {
|
|
|
+public class HRegionServer implements HConstants, HRegionInterface, Runnable {
|
|
|
|
|
|
+ /* (non-Javadoc)
|
|
|
+ * @see org.apache.hadoop.ipc.VersionedProtocol#getProtocolVersion(java.lang.String, long)
|
|
|
+ */
|
|
|
public long getProtocolVersion(final String protocol,
|
|
|
@SuppressWarnings("unused") final long clientVersion)
|
|
|
throws IOException {
|
|
@@ -57,18 +60,20 @@ public class HRegionServer
|
|
|
static final Log LOG = LogFactory.getLog(HRegionServer.class);
|
|
|
|
|
|
volatile boolean stopRequested;
|
|
|
- private Path regionDir;
|
|
|
+ volatile boolean abortRequested;
|
|
|
+ private Path rootDir;
|
|
|
HServerInfo info;
|
|
|
Configuration conf;
|
|
|
private Random rand;
|
|
|
|
|
|
// region name -> HRegion
|
|
|
- TreeMap<Text, HRegion> onlineRegions = new TreeMap<Text, HRegion>();
|
|
|
+ SortedMap<Text, HRegion> onlineRegions;
|
|
|
Map<Text, HRegion> retiringRegions = new HashMap<Text, HRegion>();
|
|
|
|
|
|
final ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
|
|
|
private Vector<HMsg> outboundMsgs;
|
|
|
|
|
|
+ int numRetries;
|
|
|
long threadWakeFrequency;
|
|
|
private long msgInterval;
|
|
|
|
|
@@ -78,20 +83,24 @@ public class HRegionServer
|
|
|
private Thread splitOrCompactCheckerThread;
|
|
|
Integer splitOrCompactLock = Integer.valueOf(0);
|
|
|
|
|
|
- /*
|
|
|
+ /**
|
|
|
* Interface used by the {@link org.apache.hadoop.io.retry} mechanism.
|
|
|
*/
|
|
|
- interface UpdateMetaInterface {
|
|
|
- /*
|
|
|
+ public interface UpdateMetaInterface {
|
|
|
+ /**
|
|
|
* @return True if succeeded.
|
|
|
* @throws IOException
|
|
|
*/
|
|
|
- boolean update() throws IOException;
|
|
|
+ public boolean update() throws IOException;
|
|
|
}
|
|
|
-
|
|
|
+
|
|
|
+ /** Runs periodically to determine if regions need to be compacted or split */
|
|
|
class SplitOrCompactChecker implements Runnable, RegionUnavailableListener {
|
|
|
HClient client = new HClient(conf);
|
|
|
|
|
|
+ /* (non-Javadoc)
|
|
|
+ * @see org.apache.hadoop.hbase.RegionUnavailableListener#closing(org.apache.hadoop.io.Text)
|
|
|
+ */
|
|
|
public void closing(final Text regionName) {
|
|
|
lock.writeLock().lock();
|
|
|
try {
|
|
@@ -106,6 +115,9 @@ public class HRegionServer
|
|
|
}
|
|
|
}
|
|
|
|
|
|
+ /* (non-Javadoc)
|
|
|
+ * @see org.apache.hadoop.hbase.RegionUnavailableListener#closed(org.apache.hadoop.io.Text)
|
|
|
+ */
|
|
|
public void closed(final Text regionName) {
|
|
|
lock.writeLock().lock();
|
|
|
try {
|
|
@@ -118,6 +130,9 @@ public class HRegionServer
|
|
|
}
|
|
|
}
|
|
|
|
|
|
+ /* (non-Javadoc)
|
|
|
+ * @see java.lang.Runnable#run()
|
|
|
+ */
|
|
|
public void run() {
|
|
|
while(! stopRequested) {
|
|
|
long startTime = System.currentTimeMillis();
|
|
@@ -180,7 +195,7 @@ public class HRegionServer
|
|
|
// splitting a 'normal' region, and the ROOT table needs to be
|
|
|
// updated if we are splitting a META region.
|
|
|
final Text tableToUpdate =
|
|
|
- (oldRegion.find(META_TABLE_NAME.toString()) == 0) ?
|
|
|
+ region.getRegionInfo().tableDesc.getName().equals(META_TABLE_NAME) ?
|
|
|
ROOT_TABLE_NAME : META_TABLE_NAME;
|
|
|
if(LOG.isDebugEnabled()) {
|
|
|
LOG.debug("Updating " + tableToUpdate + " with region split info");
|
|
@@ -188,6 +203,9 @@ public class HRegionServer
|
|
|
|
|
|
// Wrap the update of META region with an org.apache.hadoop.io.retry.
|
|
|
UpdateMetaInterface implementation = new UpdateMetaInterface() {
|
|
|
+ /* (non-Javadoc)
|
|
|
+ * @see org.apache.hadoop.hbase.HRegionServer.UpdateMetaInterface#update()
|
|
|
+ */
|
|
|
public boolean update() throws IOException {
|
|
|
HRegion.removeRegionFromMETA(client, tableToUpdate,
|
|
|
region.getRegionName());
|
|
@@ -232,7 +250,11 @@ public class HRegionServer
|
|
|
private Flusher cacheFlusher;
|
|
|
private Thread cacheFlusherThread;
|
|
|
Integer cacheFlusherLock = Integer.valueOf(0);
|
|
|
+ /** Runs periodically to flush the memcache */
|
|
|
class Flusher implements Runnable {
|
|
|
+ /* (non-Javadoc)
|
|
|
+ * @see java.lang.Runnable#run()
|
|
|
+ */
|
|
|
public void run() {
|
|
|
while(! stopRequested) {
|
|
|
long startTime = System.currentTimeMillis();
|
|
@@ -283,21 +305,22 @@ public class HRegionServer
|
|
|
// File paths
|
|
|
|
|
|
private FileSystem fs;
|
|
|
- private Path oldlogfile;
|
|
|
|
|
|
// Logging
|
|
|
+
|
|
|
HLog log;
|
|
|
private LogRoller logRoller;
|
|
|
private Thread logRollerThread;
|
|
|
Integer logRollerLock = Integer.valueOf(0);
|
|
|
|
|
|
- /**
|
|
|
- * Log rolling Runnable.
|
|
|
- */
|
|
|
+ /** Runs periodically to determine if the log should be rolled */
|
|
|
class LogRoller implements Runnable {
|
|
|
private int maxLogEntries =
|
|
|
conf.getInt("hbase.regionserver.maxlogentries", 30 * 1000);
|
|
|
|
|
|
+ /* (non-Javadoc)
|
|
|
+ * @see java.lang.Runnable#run()
|
|
|
+ */
|
|
|
public void run() {
|
|
|
while(! stopRequested) {
|
|
|
synchronized(logRollerLock) {
|
|
@@ -339,48 +362,61 @@ public class HRegionServer
|
|
|
// Leases
|
|
|
private Leases leases;
|
|
|
|
|
|
- /** Start a HRegionServer at the default location */
|
|
|
+ /**
|
|
|
+ * Starts a HRegionServer at the default location
|
|
|
+ * @param conf
|
|
|
+ * @throws IOException
|
|
|
+ */
|
|
|
public HRegionServer(Configuration conf) throws IOException {
|
|
|
- this(new Path(conf.get(HREGION_DIR, DEFAULT_HREGION_DIR)),
|
|
|
+ this(new Path(conf.get(HBASE_DIR, DEFAULT_HBASE_DIR)),
|
|
|
new HServerAddress(conf.get(REGIONSERVER_ADDRESS, "localhost:0")),
|
|
|
conf);
|
|
|
}
|
|
|
|
|
|
- /** Start a HRegionServer at an indicated location */
|
|
|
- public HRegionServer(Path regionDir, HServerAddress address,
|
|
|
+ /**
|
|
|
+ * Starts a HRegionServer at the specified location
|
|
|
+ * @param rootDir
|
|
|
+ * @param address
|
|
|
+ * @param conf
|
|
|
+ * @throws IOException
|
|
|
+ */
|
|
|
+ public HRegionServer(Path rootDir, HServerAddress address,
|
|
|
Configuration conf) throws IOException {
|
|
|
|
|
|
// Basic setup
|
|
|
this.stopRequested = false;
|
|
|
- this.regionDir = regionDir;
|
|
|
+ this.abortRequested = false;
|
|
|
+ this.rootDir = rootDir;
|
|
|
this.conf = conf;
|
|
|
this.rand = new Random();
|
|
|
+ this.onlineRegions =
|
|
|
+ Collections.synchronizedSortedMap(new TreeMap<Text, HRegion>());
|
|
|
+
|
|
|
this.outboundMsgs = new Vector<HMsg>();
|
|
|
this.scanners =
|
|
|
Collections.synchronizedMap(new TreeMap<Text, HInternalScannerInterface>());
|
|
|
|
|
|
// Config'ed params
|
|
|
+ this.numRetries = conf.getInt("hbase.client.retries.number", 2);
|
|
|
this.threadWakeFrequency = conf.getLong(THREAD_WAKE_FREQUENCY, 10 * 1000);
|
|
|
this.msgInterval = conf.getLong("hbase.regionserver.msginterval",
|
|
|
15 * 1000);
|
|
|
this.splitOrCompactCheckFrequency =
|
|
|
conf.getLong("hbase.regionserver.thread.splitcompactcheckfrequency",
|
|
|
60 * 1000);
|
|
|
-
|
|
|
+
|
|
|
// Cache flushing
|
|
|
this.cacheFlusher = new Flusher();
|
|
|
- this.cacheFlusherThread =
|
|
|
- new Thread(cacheFlusher, "HRegionServer.cacheFlusher");
|
|
|
+ this.cacheFlusherThread = new Thread(cacheFlusher);
|
|
|
|
|
|
// Check regions to see if they need to be split
|
|
|
this.splitOrCompactChecker = new SplitOrCompactChecker();
|
|
|
- this.splitOrCompactCheckerThread =
|
|
|
- new Thread(splitOrCompactChecker, "HRegionServer.splitOrCompactChecker");
|
|
|
+ this.splitOrCompactCheckerThread = new Thread(splitOrCompactChecker);
|
|
|
|
|
|
// Process requests from Master
|
|
|
- this.toDo = new Vector<HMsg>();
|
|
|
+ this.toDo = new LinkedList<ToDoEntry>();
|
|
|
this.worker = new Worker();
|
|
|
- this.workerThread = new Thread(worker, "HRegionServer.worker");
|
|
|
+ this.workerThread = new Thread(worker);
|
|
|
|
|
|
try {
|
|
|
// Server to handle client requests
|
|
@@ -398,20 +434,19 @@ public class HRegionServer
|
|
|
this.info.getServerAddress().getBindAddress() + "_"
|
|
|
+ this.info.getServerAddress().getPort();
|
|
|
|
|
|
- Path newlogdir = new Path(regionDir, "log" + "_" + serverName);
|
|
|
- this.oldlogfile = new Path(regionDir, "oldlogfile" + "_" + serverName);
|
|
|
+ Path logdir = new Path(rootDir, "log" + "_" + serverName);
|
|
|
|
|
|
// Logging
|
|
|
|
|
|
this.fs = FileSystem.get(conf);
|
|
|
- HLog.consolidateOldLog(newlogdir, oldlogfile, fs, conf);
|
|
|
- // TODO: Now we have a consolidated log for all regions, sort and
|
|
|
- // then split result by region passing the splits as reconstruction
|
|
|
- // logs to HRegions on start. Or, rather than consolidate, split logs
|
|
|
- // into per region files.
|
|
|
- this.log = new HLog(fs, newlogdir, conf);
|
|
|
+ if(fs.exists(logdir)) {
|
|
|
+ throw new RegionServerRunningException("region server already running at "
|
|
|
+ + this.info.getServerAddress().toString());
|
|
|
+ }
|
|
|
+
|
|
|
+ this.log = new HLog(fs, logdir, conf);
|
|
|
this.logRoller = new LogRoller();
|
|
|
- this.logRollerThread = new Thread(logRoller, "HRegionServer.logRoller");
|
|
|
+ this.logRollerThread = new Thread(logRoller);
|
|
|
|
|
|
// Remote HMaster
|
|
|
|
|
@@ -420,40 +455,37 @@ public class HRegionServer
|
|
|
new HServerAddress(conf.get(MASTER_ADDRESS)).getInetSocketAddress(),
|
|
|
conf);
|
|
|
|
|
|
- // Threads
|
|
|
-
|
|
|
- this.workerThread.start();
|
|
|
- this.cacheFlusherThread.start();
|
|
|
- this.splitOrCompactCheckerThread.start();
|
|
|
- this.logRollerThread.start();
|
|
|
- this.leases = new Leases(conf.getLong("hbase.regionserver.lease.period",
|
|
|
- 3 * 60 * 1000), threadWakeFrequency);
|
|
|
-
|
|
|
- // Server
|
|
|
-
|
|
|
- this.server.start();
|
|
|
-
|
|
|
} catch(IOException e) {
|
|
|
this.stopRequested = true;
|
|
|
throw e;
|
|
|
}
|
|
|
-
|
|
|
- LOG.info("HRegionServer started at: " + address.toString());
|
|
|
}
|
|
|
|
|
|
/**
|
|
|
- * Set a flag that will cause all the HRegionServer threads to shut down
|
|
|
+ * Sets a flag that will cause all the HRegionServer threads to shut down
|
|
|
* in an orderly fashion.
|
|
|
*/
|
|
|
- public synchronized void stop() {
|
|
|
+ synchronized void stop() {
|
|
|
stopRequested = true;
|
|
|
notifyAll(); // Wakes run() if it is sleeping
|
|
|
}
|
|
|
+
|
|
|
+ /**
|
|
|
+ * Cause the server to exit without closing the regions it is serving, the
|
|
|
+ * log it is using and without notifying the master.
|
|
|
+ *
|
|
|
+ * FOR DEBUGGING ONLY
|
|
|
+ */
|
|
|
+ synchronized void abort() {
|
|
|
+ abortRequested = true;
|
|
|
+ stop();
|
|
|
+ }
|
|
|
|
|
|
- /** Wait on all threads to finish.
|
|
|
+ /**
|
|
|
+ * Wait on all threads to finish.
|
|
|
* Presumption is that all closes and stops have already been called.
|
|
|
*/
|
|
|
- public void join() {
|
|
|
+ void join() {
|
|
|
try {
|
|
|
this.workerThread.join();
|
|
|
} catch(InterruptedException iex) {
|
|
@@ -489,6 +521,33 @@ public class HRegionServer
|
|
|
* load/unload instructions.
|
|
|
*/
|
|
|
public void run() {
|
|
|
+
|
|
|
+ // Threads
|
|
|
+
|
|
|
+ String threadName = Thread.currentThread().getName();
|
|
|
+
|
|
|
+ workerThread.setName(threadName + ".worker");
|
|
|
+ workerThread.start();
|
|
|
+ cacheFlusherThread.setName(threadName + ".cacheFlusher");
|
|
|
+ cacheFlusherThread.start();
|
|
|
+ splitOrCompactCheckerThread.setName(threadName + ".splitOrCompactChecker");
|
|
|
+ splitOrCompactCheckerThread.start();
|
|
|
+ logRollerThread.setName(threadName + ".logRoller");
|
|
|
+ logRollerThread.start();
|
|
|
+ leases = new Leases(conf.getLong("hbase.regionserver.lease.period",
|
|
|
+ 3 * 60 * 1000), threadWakeFrequency);
|
|
|
+
|
|
|
+ // Server
|
|
|
+
|
|
|
+ try {
|
|
|
+ this.server.start();
|
|
|
+ LOG.info("HRegionServer started at: " + info.getServerAddress().toString());
|
|
|
+
|
|
|
+ } catch(IOException e) {
|
|
|
+ LOG.error(e);
|
|
|
+ stopRequested = true;
|
|
|
+ }
|
|
|
+
|
|
|
while(! stopRequested) {
|
|
|
long lastMsg = 0;
|
|
|
long waitTime;
|
|
@@ -545,7 +604,6 @@ public class HRegionServer
|
|
|
if (LOG.isDebugEnabled()) {
|
|
|
LOG.debug("Got call server startup message");
|
|
|
}
|
|
|
- toDo.clear();
|
|
|
closeAllRegions();
|
|
|
restart = true;
|
|
|
break;
|
|
@@ -554,8 +612,6 @@ public class HRegionServer
|
|
|
if (LOG.isDebugEnabled()) {
|
|
|
LOG.debug("Got regionserver stop message");
|
|
|
}
|
|
|
- toDo.clear();
|
|
|
- closeAllRegions();
|
|
|
stopRequested = true;
|
|
|
break;
|
|
|
|
|
@@ -563,19 +619,21 @@ public class HRegionServer
|
|
|
if (LOG.isDebugEnabled()) {
|
|
|
LOG.debug("Got default message");
|
|
|
}
|
|
|
- toDo.add(msgs[i]);
|
|
|
+ toDo.addLast(new ToDoEntry(msgs[i]));
|
|
|
}
|
|
|
}
|
|
|
|
|
|
+ if(restart || stopRequested) {
|
|
|
+ toDo.clear();
|
|
|
+ break;
|
|
|
+ }
|
|
|
+
|
|
|
if(toDo.size() > 0) {
|
|
|
if (LOG.isDebugEnabled()) {
|
|
|
LOG.debug("notify on todo");
|
|
|
}
|
|
|
toDo.notifyAll();
|
|
|
}
|
|
|
- if(restart || stopRequested) {
|
|
|
- break;
|
|
|
- }
|
|
|
}
|
|
|
|
|
|
} catch(IOException e) {
|
|
@@ -596,41 +654,65 @@ public class HRegionServer
|
|
|
}
|
|
|
}
|
|
|
}
|
|
|
- try {
|
|
|
- HMsg[] exitMsg = { new HMsg(HMsg.MSG_REPORT_EXITING) };
|
|
|
- hbaseMaster.regionServerReport(info, exitMsg);
|
|
|
-
|
|
|
- } catch(IOException e) {
|
|
|
- LOG.warn(e);
|
|
|
+ this.worker.stop();
|
|
|
+ this.server.stop();
|
|
|
+ leases.close();
|
|
|
+
|
|
|
+ // Send interrupts to wake up threads if sleeping so they notice shutdown.
|
|
|
+
|
|
|
+ synchronized(logRollerLock) {
|
|
|
+ this.logRollerThread.interrupt();
|
|
|
}
|
|
|
- try {
|
|
|
- LOG.info("stopping server at: " + info.getServerAddress().toString());
|
|
|
|
|
|
- // Send interrupts to wake up threads if sleeping so they notice shutdown.
|
|
|
+ synchronized(cacheFlusherLock) {
|
|
|
+ this.cacheFlusherThread.interrupt();
|
|
|
+ }
|
|
|
+
|
|
|
+ synchronized(splitOrCompactLock) {
|
|
|
+ this.splitOrCompactCheckerThread.interrupt();
|
|
|
+ }
|
|
|
|
|
|
- synchronized(logRollerLock) {
|
|
|
- this.logRollerThread.interrupt();
|
|
|
+ if(abortRequested) {
|
|
|
+ try {
|
|
|
+ log.rollWriter();
|
|
|
+
|
|
|
+ } catch(IOException e) {
|
|
|
+ LOG.warn(e);
|
|
|
}
|
|
|
+ LOG.info("aborting server at: " + info.getServerAddress().toString());
|
|
|
|
|
|
- synchronized(cacheFlusherLock) {
|
|
|
- this.cacheFlusherThread.interrupt();
|
|
|
+ } else {
|
|
|
+ Vector<HRegion> closedRegions = closeAllRegions();
|
|
|
+ try {
|
|
|
+ log.closeAndDelete();
|
|
|
+
|
|
|
+ } catch(IOException e) {
|
|
|
+ LOG.error(e);
|
|
|
}
|
|
|
-
|
|
|
- synchronized(splitOrCompactLock) {
|
|
|
- this.splitOrCompactCheckerThread.interrupt();
|
|
|
+ try {
|
|
|
+ HMsg[] exitMsg = new HMsg[closedRegions.size() + 1];
|
|
|
+ exitMsg[0] = new HMsg(HMsg.MSG_REPORT_EXITING);
|
|
|
+
|
|
|
+ // Tell the master what regions we are/were serving
|
|
|
+
|
|
|
+ int i = 1;
|
|
|
+ for(HRegion region: closedRegions) {
|
|
|
+ exitMsg[i++] = new HMsg(HMsg.MSG_REPORT_CLOSE, region.getRegionInfo());
|
|
|
+ }
|
|
|
+
|
|
|
+ LOG.info("telling master that region server is shutting down at: "
|
|
|
+ +info.getServerAddress().toString());
|
|
|
+
|
|
|
+ hbaseMaster.regionServerReport(info, exitMsg);
|
|
|
+
|
|
|
+ } catch(IOException e) {
|
|
|
+ LOG.warn(e);
|
|
|
}
|
|
|
-
|
|
|
- this.worker.stop();
|
|
|
- this.server.stop();
|
|
|
+ LOG.info("stopping server at: " + info.getServerAddress().toString());
|
|
|
+ }
|
|
|
|
|
|
- closeAllRegions();
|
|
|
- log.close();
|
|
|
- leases.close();
|
|
|
- join();
|
|
|
+ join();
|
|
|
|
|
|
- } catch(IOException e) {
|
|
|
- LOG.error(e);
|
|
|
- }
|
|
|
if(LOG.isDebugEnabled()) {
|
|
|
LOG.debug("main thread exiting");
|
|
|
}
|
|
@@ -650,7 +732,7 @@ public class HRegionServer
|
|
|
}
|
|
|
}
|
|
|
|
|
|
- /**
|
|
|
+ /**
|
|
|
* Add to the outbound message buffer
|
|
|
*
|
|
|
* When a region splits, we need to tell the master that there are two new
|
|
@@ -671,89 +753,84 @@ public class HRegionServer
|
|
|
// HMaster-given operations
|
|
|
//////////////////////////////////////////////////////////////////////////////
|
|
|
|
|
|
- Vector<HMsg> toDo;
|
|
|
+ private static class ToDoEntry {
|
|
|
+ int tries;
|
|
|
+ HMsg msg;
|
|
|
+ ToDoEntry(HMsg msg) {
|
|
|
+ this.tries = 0;
|
|
|
+ this.msg = msg;
|
|
|
+ }
|
|
|
+ }
|
|
|
+ LinkedList<ToDoEntry> toDo;
|
|
|
private Worker worker;
|
|
|
private Thread workerThread;
|
|
|
+ /** Thread that performs long running requests from the master */
|
|
|
class Worker implements Runnable {
|
|
|
- public void stop() {
|
|
|
+ void stop() {
|
|
|
synchronized(toDo) {
|
|
|
toDo.notifyAll();
|
|
|
}
|
|
|
}
|
|
|
|
|
|
+ /* (non-Javadoc)
|
|
|
+ * @see java.lang.Runnable#run()
|
|
|
+ */
|
|
|
public void run() {
|
|
|
- for(HMsg msg = null; !stopRequested; ) {
|
|
|
+ for(ToDoEntry e = null; !stopRequested; ) {
|
|
|
synchronized(toDo) {
|
|
|
while(toDo.size() == 0 && !stopRequested) {
|
|
|
try {
|
|
|
if (LOG.isDebugEnabled()) {
|
|
|
LOG.debug("Wait on todo");
|
|
|
}
|
|
|
- toDo.wait();
|
|
|
+ toDo.wait(threadWakeFrequency);
|
|
|
if (LOG.isDebugEnabled()) {
|
|
|
LOG.debug("Wake on todo");
|
|
|
}
|
|
|
- } catch(InterruptedException e) {
|
|
|
+ } catch(InterruptedException ex) {
|
|
|
// continue
|
|
|
}
|
|
|
}
|
|
|
if(stopRequested) {
|
|
|
continue;
|
|
|
}
|
|
|
- msg = toDo.remove(0);
|
|
|
+ e = toDo.removeFirst();
|
|
|
}
|
|
|
|
|
|
try {
|
|
|
- switch(msg.getMsg()) {
|
|
|
+ if (LOG.isDebugEnabled()) {
|
|
|
+ LOG.debug(e.msg.toString());
|
|
|
+ }
|
|
|
+
|
|
|
+ switch(e.msg.getMsg()) {
|
|
|
|
|
|
case HMsg.MSG_REGION_OPEN: // Open a region
|
|
|
- if (LOG.isDebugEnabled()) {
|
|
|
- LOG.debug("MSG_REGION_OPEN");
|
|
|
- }
|
|
|
- openRegion(msg.getRegionInfo());
|
|
|
+ openRegion(e.msg.getRegionInfo());
|
|
|
break;
|
|
|
|
|
|
case HMsg.MSG_REGION_CLOSE: // Close a region
|
|
|
- if (LOG.isDebugEnabled()) {
|
|
|
- LOG.debug("MSG_REGION_CLOSE");
|
|
|
- }
|
|
|
- closeRegion(msg.getRegionInfo(), true);
|
|
|
+ closeRegion(e.msg.getRegionInfo(), true);
|
|
|
break;
|
|
|
|
|
|
- case HMsg.MSG_REGION_MERGE: // Merge two regions
|
|
|
- if (LOG.isDebugEnabled()) {
|
|
|
- LOG.debug("MSG_REGION_MERGE");
|
|
|
- }
|
|
|
- //TODO ???
|
|
|
- throw new IOException("TODO: need to figure out merge");
|
|
|
- //break;
|
|
|
-
|
|
|
- case HMsg.MSG_CALL_SERVER_STARTUP: // Close regions, restart
|
|
|
- if (LOG.isDebugEnabled()) {
|
|
|
- LOG.debug("MSG_CALL_SERVER_STARTUP");
|
|
|
- }
|
|
|
- closeAllRegions();
|
|
|
- continue;
|
|
|
-
|
|
|
- case HMsg.MSG_REGIONSERVER_STOP: // Go away
|
|
|
- if (LOG.isDebugEnabled()) {
|
|
|
- LOG.debug("MSG_REGIONSERVER_STOP");
|
|
|
- }
|
|
|
- stopRequested = true;
|
|
|
- continue;
|
|
|
-
|
|
|
case HMsg.MSG_REGION_CLOSE_WITHOUT_REPORT: // Close a region, don't reply
|
|
|
- if (LOG.isDebugEnabled()) {
|
|
|
- LOG.debug("MSG_REGION_CLOSE_WITHOUT_REPORT");
|
|
|
- }
|
|
|
- closeRegion(msg.getRegionInfo(), false);
|
|
|
+ closeRegion(e.msg.getRegionInfo(), false);
|
|
|
break;
|
|
|
|
|
|
default:
|
|
|
- throw new IOException("Impossible state during msg processing. Instruction: " + msg);
|
|
|
+ throw new AssertionError(
|
|
|
+ "Impossible state during msg processing. Instruction: "
|
|
|
+ + e.msg.toString());
|
|
|
+ }
|
|
|
+ } catch(IOException ie) {
|
|
|
+ if(e.tries < numRetries) {
|
|
|
+ LOG.warn(ie);
|
|
|
+ e.tries++;
|
|
|
+ synchronized(toDo) {
|
|
|
+ toDo.addLast(e);
|
|
|
+ }
|
|
|
+ } else {
|
|
|
+ LOG.error("unable to process message: " + e.msg.toString(), ie);
|
|
|
}
|
|
|
- } catch(IOException e) {
|
|
|
- LOG.error(e);
|
|
|
}
|
|
|
}
|
|
|
LOG.info("worker thread exiting");
|
|
@@ -761,15 +838,18 @@ public class HRegionServer
|
|
|
}
|
|
|
|
|
|
void openRegion(HRegionInfo regionInfo) throws IOException {
|
|
|
- this.lock.writeLock().lock();
|
|
|
- try {
|
|
|
- HRegion region =
|
|
|
- new HRegion(regionDir, log, fs, conf, regionInfo, null, oldlogfile);
|
|
|
- this.onlineRegions.put(region.getRegionName(), region);
|
|
|
- reportOpen(region);
|
|
|
- } finally {
|
|
|
- this.lock.writeLock().unlock();
|
|
|
+ HRegion region = onlineRegions.get(regionInfo.regionName);
|
|
|
+ if(region == null) {
|
|
|
+ region = new HRegion(rootDir, log, fs, conf, regionInfo, null);
|
|
|
+
|
|
|
+ this.lock.writeLock().lock();
|
|
|
+ try {
|
|
|
+ this.onlineRegions.put(region.getRegionName(), region);
|
|
|
+ } finally {
|
|
|
+ this.lock.writeLock().unlock();
|
|
|
+ }
|
|
|
}
|
|
|
+ reportOpen(region);
|
|
|
}
|
|
|
|
|
|
void closeRegion(final HRegionInfo hri, final boolean reportWhenCompleted)
|
|
@@ -791,7 +871,7 @@ public class HRegionServer
|
|
|
}
|
|
|
|
|
|
/** Called either when the master tells us to restart or from stop() */
|
|
|
- void closeAllRegions() {
|
|
|
+ Vector<HRegion> closeAllRegions() {
|
|
|
Vector<HRegion> regionsToClose = new Vector<HRegion>();
|
|
|
this.lock.writeLock().lock();
|
|
|
try {
|
|
@@ -800,8 +880,7 @@ public class HRegionServer
|
|
|
} finally {
|
|
|
this.lock.writeLock().unlock();
|
|
|
}
|
|
|
- for(Iterator<HRegion> it = regionsToClose.iterator(); it.hasNext(); ) {
|
|
|
- HRegion region = it.next();
|
|
|
+ for(HRegion region: regionsToClose) {
|
|
|
if (LOG.isDebugEnabled()) {
|
|
|
LOG.debug("closing region " + region.getRegionName());
|
|
|
}
|
|
@@ -812,106 +891,117 @@ public class HRegionServer
|
|
|
LOG.error("error closing region " + region.getRegionName(), e);
|
|
|
}
|
|
|
}
|
|
|
+ return regionsToClose;
|
|
|
}
|
|
|
|
|
|
//////////////////////////////////////////////////////////////////////////////
|
|
|
// HRegionInterface
|
|
|
//////////////////////////////////////////////////////////////////////////////
|
|
|
|
|
|
- /** Obtain a table descriptor for the given region */
|
|
|
- public HRegionInfo getRegionInfo(Text regionName)
|
|
|
+ /* (non-Javadoc)
|
|
|
+ * @see org.apache.hadoop.hbase.HRegionInterface#getRegionInfo(org.apache.hadoop.io.Text)
|
|
|
+ */
|
|
|
+ public HRegionInfo getRegionInfo(final Text regionName)
|
|
|
throws NotServingRegionException {
|
|
|
return getRegion(regionName).getRegionInfo();
|
|
|
}
|
|
|
|
|
|
- /** Get the indicated row/column */
|
|
|
- public BytesWritable get(Text regionName, Text row, Text column)
|
|
|
- throws IOException {
|
|
|
+ /* (non-Javadoc)
|
|
|
+ * @see org.apache.hadoop.hbase.HRegionInterface#get(org.apache.hadoop.io.Text, org.apache.hadoop.io.Text, org.apache.hadoop.io.Text)
|
|
|
+ */
|
|
|
+ public BytesWritable get(final Text regionName, final Text row,
|
|
|
+ final Text column) throws IOException {
|
|
|
+
|
|
|
return getRegion(regionName).get(row, column);
|
|
|
}
|
|
|
|
|
|
- /** Get multiple versions of the indicated row/col */
|
|
|
- public BytesWritable[] get(Text regionName, Text row, Text column,
|
|
|
- int numVersions)
|
|
|
- throws IOException {
|
|
|
+ /* (non-Javadoc)
|
|
|
+ * @see org.apache.hadoop.hbase.HRegionInterface#get(org.apache.hadoop.io.Text, org.apache.hadoop.io.Text, org.apache.hadoop.io.Text, int)
|
|
|
+ */
|
|
|
+ public BytesWritable[] get(final Text regionName, final Text row,
|
|
|
+ final Text column, final int numVersions) throws IOException {
|
|
|
+
|
|
|
return getRegion(regionName).get(row, column, numVersions);
|
|
|
}
|
|
|
|
|
|
- /** Get multiple timestamped versions of the indicated row/col */
|
|
|
- public BytesWritable[] get(Text regionName, Text row, Text column,
|
|
|
- long timestamp, int numVersions)
|
|
|
- throws IOException {
|
|
|
+ /* (non-Javadoc)
|
|
|
+ * @see org.apache.hadoop.hbase.HRegionInterface#get(org.apache.hadoop.io.Text, org.apache.hadoop.io.Text, org.apache.hadoop.io.Text, long, int)
|
|
|
+ */
|
|
|
+ public BytesWritable[] get(final Text regionName, final Text row, final Text column,
|
|
|
+ final long timestamp, final int numVersions) throws IOException {
|
|
|
+
|
|
|
return getRegion(regionName).get(row, column, timestamp, numVersions);
|
|
|
}
|
|
|
|
|
|
- /** Get all the columns (along with their names) for a given row. */
|
|
|
- public LabelledData[] getRow(Text regionName, Text row) throws IOException {
|
|
|
+ /* (non-Javadoc)
|
|
|
+ * @see org.apache.hadoop.hbase.HRegionInterface#getRow(org.apache.hadoop.io.Text, org.apache.hadoop.io.Text)
|
|
|
+ */
|
|
|
+ public KeyedData[] getRow(final Text regionName, final Text row) throws IOException {
|
|
|
HRegion region = getRegion(regionName);
|
|
|
TreeMap<Text, BytesWritable> map = region.getFull(row);
|
|
|
- LabelledData result[] = new LabelledData[map.size()];
|
|
|
+ KeyedData result[] = new KeyedData[map.size()];
|
|
|
int counter = 0;
|
|
|
for (Map.Entry<Text, BytesWritable> es: map.entrySet()) {
|
|
|
- result[counter++] = new LabelledData(es.getKey(), es.getValue());
|
|
|
+ result[counter++] =
|
|
|
+ new KeyedData(new HStoreKey(row, es.getKey()), es.getValue());
|
|
|
}
|
|
|
return result;
|
|
|
}
|
|
|
|
|
|
- /**
|
|
|
- * Start an update to the HBase. This also creates a lease associated with
|
|
|
- * the caller.
|
|
|
+ /* (non-Javadoc)
|
|
|
+ * @see org.apache.hadoop.hbase.HRegionInterface#next(long)
|
|
|
*/
|
|
|
- private static class RegionListener extends LeaseListener {
|
|
|
- private HRegion localRegion;
|
|
|
- private long localLockId;
|
|
|
-
|
|
|
- public RegionListener(HRegion region, long lockId) {
|
|
|
- this.localRegion = region;
|
|
|
- this.localLockId = lockId;
|
|
|
- }
|
|
|
+ public KeyedData[] next(final long scannerId)
|
|
|
+ throws IOException {
|
|
|
|
|
|
- public void leaseExpired() {
|
|
|
- try {
|
|
|
- localRegion.abort(localLockId);
|
|
|
-
|
|
|
- } catch(IOException iex) {
|
|
|
- LOG.error(iex);
|
|
|
- }
|
|
|
- }
|
|
|
- }
|
|
|
-
|
|
|
- public LabelledData[] next(final long scannerId, final HStoreKey key)
|
|
|
- throws IOException {
|
|
|
Text scannerName = new Text(String.valueOf(scannerId));
|
|
|
HInternalScannerInterface s = scanners.get(scannerName);
|
|
|
if (s == null) {
|
|
|
- throw new UnknownScannerException("Name: " + scannerName + ", key " +
|
|
|
- key);
|
|
|
+ throw new UnknownScannerException("Name: " + scannerName);
|
|
|
}
|
|
|
leases.renewLease(scannerName, scannerName);
|
|
|
+
|
|
|
+ // Collect values to be returned here
|
|
|
+
|
|
|
+ ArrayList<KeyedData> values = new ArrayList<KeyedData>();
|
|
|
+
|
|
|
TreeMap<Text, BytesWritable> results = new TreeMap<Text, BytesWritable>();
|
|
|
- ArrayList<LabelledData> values = new ArrayList<LabelledData>();
|
|
|
- // Keep getting rows till we find one that has at least one non-deleted
|
|
|
- // column value.
|
|
|
+
|
|
|
+ // Keep getting rows until we find one that has at least one non-deleted column value
|
|
|
+
|
|
|
+ HStoreKey key = new HStoreKey();
|
|
|
while (s.next(key, results)) {
|
|
|
for(Map.Entry<Text, BytesWritable> e: results.entrySet()) {
|
|
|
+ HStoreKey k = new HStoreKey(key.getRow(), e.getKey(), key.getTimestamp());
|
|
|
BytesWritable val = e.getValue();
|
|
|
if(val.getSize() == DELETE_BYTES.getSize()
|
|
|
&& val.compareTo(DELETE_BYTES) == 0) {
|
|
|
// Column value is deleted. Don't return it.
|
|
|
+ if (LOG.isDebugEnabled()) {
|
|
|
+ LOG.debug("skipping deleted value for key: " + k.toString());
|
|
|
+ }
|
|
|
continue;
|
|
|
}
|
|
|
- values.add(new LabelledData(e.getKey(), val));
|
|
|
+ if (LOG.isDebugEnabled()) {
|
|
|
+ LOG.debug("adding value for key: " + k.toString());
|
|
|
+ }
|
|
|
+ values.add(new KeyedData(k, val));
|
|
|
}
|
|
|
- if (values.size() > 0) {
|
|
|
- // Row has something in it. Let it out. Else go get another row.
|
|
|
+ if(values.size() > 0) {
|
|
|
+ // Row has something in it. Return the value.
|
|
|
break;
|
|
|
}
|
|
|
- // Need to clear results before we go back up and call 'next' again.
|
|
|
+
|
|
|
+ // No data for this row, go get another.
|
|
|
+
|
|
|
results.clear();
|
|
|
}
|
|
|
- return values.toArray(new LabelledData[values.size()]);
|
|
|
+ return values.toArray(new KeyedData[values.size()]);
|
|
|
}
|
|
|
|
|
|
+ /* (non-Javadoc)
|
|
|
+ * @see org.apache.hadoop.hbase.HRegionInterface#startUpdate(org.apache.hadoop.io.Text, long, org.apache.hadoop.io.Text)
|
|
|
+ */
|
|
|
public long startUpdate(Text regionName, long clientid, Text row)
|
|
|
throws IOException {
|
|
|
HRegion region = getRegion(regionName);
|
|
@@ -923,7 +1013,29 @@ public class HRegionServer
|
|
|
return lockid;
|
|
|
}
|
|
|
|
|
|
- /** Add something to the HBase. */
|
|
|
+ /** Create a lease for an update. If it times out, the update is aborted */
|
|
|
+ private static class RegionListener implements LeaseListener {
|
|
|
+ private HRegion localRegion;
|
|
|
+ private long localLockId;
|
|
|
+
|
|
|
+ RegionListener(HRegion region, long lockId) {
|
|
|
+ this.localRegion = region;
|
|
|
+ this.localLockId = lockId;
|
|
|
+ }
|
|
|
+
|
|
|
+ public void leaseExpired() {
|
|
|
+ try {
|
|
|
+ localRegion.abort(localLockId);
|
|
|
+
|
|
|
+ } catch(IOException iex) {
|
|
|
+ LOG.error(iex);
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ /* (non-Javadoc)
|
|
|
+ * @see org.apache.hadoop.hbase.HRegionInterface#put(org.apache.hadoop.io.Text, long, long, org.apache.hadoop.io.Text, org.apache.hadoop.io.BytesWritable)
|
|
|
+ */
|
|
|
public void put(Text regionName, long clientid, long lockid, Text column,
|
|
|
BytesWritable val) throws IOException {
|
|
|
HRegion region = getRegion(regionName, true);
|
|
@@ -932,7 +1044,9 @@ public class HRegionServer
|
|
|
region.put(lockid, column, val);
|
|
|
}
|
|
|
|
|
|
- /** Remove a cell from the HBase. */
|
|
|
+ /* (non-Javadoc)
|
|
|
+ * @see org.apache.hadoop.hbase.HRegionInterface#delete(org.apache.hadoop.io.Text, long, long, org.apache.hadoop.io.Text)
|
|
|
+ */
|
|
|
public void delete(Text regionName, long clientid, long lockid, Text column)
|
|
|
throws IOException {
|
|
|
HRegion region = getRegion(regionName);
|
|
@@ -941,7 +1055,9 @@ public class HRegionServer
|
|
|
region.delete(lockid, column);
|
|
|
}
|
|
|
|
|
|
- /** Abandon the transaction */
|
|
|
+ /* (non-Javadoc)
|
|
|
+ * @see org.apache.hadoop.hbase.HRegionInterface#abort(org.apache.hadoop.io.Text, long, long)
|
|
|
+ */
|
|
|
public void abort(Text regionName, long clientid, long lockid)
|
|
|
throws IOException {
|
|
|
HRegion region = getRegion(regionName, true);
|
|
@@ -950,7 +1066,9 @@ public class HRegionServer
|
|
|
region.abort(lockid);
|
|
|
}
|
|
|
|
|
|
- /** Confirm the transaction */
|
|
|
+ /* (non-Javadoc)
|
|
|
+ * @see org.apache.hadoop.hbase.HRegionInterface#commit(org.apache.hadoop.io.Text, long, long)
|
|
|
+ */
|
|
|
public void commit(Text regionName, long clientid, long lockid)
|
|
|
throws IOException {
|
|
|
HRegion region = getRegion(regionName, true);
|
|
@@ -959,13 +1077,16 @@ public class HRegionServer
|
|
|
region.commit(lockid);
|
|
|
}
|
|
|
|
|
|
- /** Don't let the client's lease expire just yet... */
|
|
|
+ /* (non-Javadoc)
|
|
|
+ * @see org.apache.hadoop.hbase.HRegionInterface#renewLease(long, long)
|
|
|
+ */
|
|
|
public void renewLease(long lockid, long clientid) throws IOException {
|
|
|
leases.renewLease(new Text(String.valueOf(clientid)),
|
|
|
new Text(String.valueOf(lockid)));
|
|
|
}
|
|
|
|
|
|
- /** Private utility method for safely obtaining an HRegion handle.
|
|
|
+ /**
|
|
|
+ * Private utility method for safely obtaining an HRegion handle.
|
|
|
* @param regionName Name of online {@link HRegion} to return
|
|
|
* @return {@link HRegion} for <code>regionName</code>
|
|
|
* @throws NotServingRegionException
|
|
@@ -975,7 +1096,8 @@ public class HRegionServer
|
|
|
return getRegion(regionName, false);
|
|
|
}
|
|
|
|
|
|
- /** Private utility method for safely obtaining an HRegion handle.
|
|
|
+ /**
|
|
|
+ * Private utility method for safely obtaining an HRegion handle.
|
|
|
* @param regionName Name of online {@link HRegion} to return
|
|
|
* @param checkRetiringRegions Set true if we're to check retiring regions
|
|
|
* as well as online regions.
|
|
@@ -1013,14 +1135,21 @@ public class HRegionServer
|
|
|
//////////////////////////////////////////////////////////////////////////////
|
|
|
|
|
|
Map<Text, HInternalScannerInterface> scanners;
|
|
|
-
|
|
|
- private class ScannerListener extends LeaseListener {
|
|
|
+
|
|
|
+ /**
|
|
|
+ * Instantiated as a scanner lease.
|
|
|
+ * If the lease times out, the scanner is closed
|
|
|
+ */
|
|
|
+ private class ScannerListener implements LeaseListener {
|
|
|
private Text scannerName;
|
|
|
|
|
|
- public ScannerListener(Text scannerName) {
|
|
|
+ ScannerListener(Text scannerName) {
|
|
|
this.scannerName = scannerName;
|
|
|
}
|
|
|
|
|
|
+ /* (non-Javadoc)
|
|
|
+ * @see org.apache.hadoop.hbase.LeaseListener#leaseExpired()
|
|
|
+ */
|
|
|
public void leaseExpired() {
|
|
|
LOG.info("Scanner " + scannerName + " lease expired");
|
|
|
HInternalScannerInterface s = null;
|
|
@@ -1033,7 +1162,9 @@ public class HRegionServer
|
|
|
}
|
|
|
}
|
|
|
|
|
|
- /** Start a scanner for a given HRegion. */
|
|
|
+ /* (non-Javadoc)
|
|
|
+ * @see org.apache.hadoop.hbase.HRegionInterface#openScanner(org.apache.hadoop.io.Text, org.apache.hadoop.io.Text[], org.apache.hadoop.io.Text)
|
|
|
+ */
|
|
|
public long openScanner(Text regionName, Text[] cols, Text firstRow)
|
|
|
throws IOException {
|
|
|
HRegion r = getRegion(regionName);
|
|
@@ -1054,6 +1185,9 @@ public class HRegionServer
|
|
|
return scannerId;
|
|
|
}
|
|
|
|
|
|
+ /* (non-Javadoc)
|
|
|
+ * @see org.apache.hadoop.hbase.HRegionInterface#close(long)
|
|
|
+ */
|
|
|
public void close(long scannerId) throws IOException {
|
|
|
Text scannerName = new Text(String.valueOf(scannerId));
|
|
|
HInternalScannerInterface s = null;
|
|
@@ -1080,6 +1214,9 @@ public class HRegionServer
|
|
|
System.exit(0);
|
|
|
}
|
|
|
|
|
|
+ /**
|
|
|
+ * @param args
|
|
|
+ */
|
|
|
public static void main(String [] args) {
|
|
|
if (args.length < 1) {
|
|
|
printUsageAndExit();
|
|
@@ -1100,7 +1237,7 @@ public class HRegionServer
|
|
|
try {
|
|
|
(new Thread(new HRegionServer(conf))).start();
|
|
|
} catch (Throwable t) {
|
|
|
- LOG.error( "Can not start master because "+
|
|
|
+ LOG.error( "Can not start region server because "+
|
|
|
StringUtils.stringifyException(t) );
|
|
|
System.exit(-1);
|
|
|
}
|