|
@@ -99,6 +99,7 @@ public class HRegionServer implements HConstants, HRegionInterface, Runnable {
|
|
int numRetries;
|
|
int numRetries;
|
|
protected final int threadWakeFrequency;
|
|
protected final int threadWakeFrequency;
|
|
private final int msgInterval;
|
|
private final int msgInterval;
|
|
|
|
+ private final int serverLeaseTimeout;
|
|
|
|
|
|
// Remote HMaster
|
|
// Remote HMaster
|
|
private final HMasterRegionInterface hbaseMaster;
|
|
private final HMasterRegionInterface hbaseMaster;
|
|
@@ -384,6 +385,8 @@ public class HRegionServer implements HConstants, HRegionInterface, Runnable {
|
|
this.numRetries = conf.getInt("hbase.client.retries.number", 2);
|
|
this.numRetries = conf.getInt("hbase.client.retries.number", 2);
|
|
this.threadWakeFrequency = conf.getInt(THREAD_WAKE_FREQUENCY, 10 * 1000);
|
|
this.threadWakeFrequency = conf.getInt(THREAD_WAKE_FREQUENCY, 10 * 1000);
|
|
this.msgInterval = conf.getInt("hbase.regionserver.msginterval", 3 * 1000);
|
|
this.msgInterval = conf.getInt("hbase.regionserver.msginterval", 3 * 1000);
|
|
|
|
+ this.serverLeaseTimeout =
|
|
|
|
+ conf.getInt("hbase.master.lease.period", 30 * 1000);
|
|
|
|
|
|
// Cache flushing chore thread.
|
|
// Cache flushing chore thread.
|
|
this.cacheFlusherThread =
|
|
this.cacheFlusherThread =
|
|
@@ -427,11 +430,20 @@ public class HRegionServer implements HConstants, HRegionInterface, Runnable {
|
|
|
|
|
|
try {
|
|
try {
|
|
init(reportForDuty());
|
|
init(reportForDuty());
|
|
|
|
+ long lastMsg = 0;
|
|
while(!stopRequested.get()) {
|
|
while(!stopRequested.get()) {
|
|
- long lastMsg = 0;
|
|
|
|
// Now ask master what it wants us to do and tell it what we have done
|
|
// Now ask master what it wants us to do and tell it what we have done
|
|
for (int tries = 0; !stopRequested.get();) {
|
|
for (int tries = 0; !stopRequested.get();) {
|
|
- if ((System.currentTimeMillis() - lastMsg) >= msgInterval) {
|
|
|
|
|
|
+ long now = System.currentTimeMillis();
|
|
|
|
+ if (lastMsg != 0 && (now - lastMsg) >= serverLeaseTimeout) {
|
|
|
|
+ // It has been way too long since we last reported to the master.
|
|
|
|
+ // Commit suicide.
|
|
|
|
+ LOG.fatal("unable to report to master for " + (now - lastMsg) +
|
|
|
|
+ " milliseconds - aborting server");
|
|
|
|
+ abort();
|
|
|
|
+ break;
|
|
|
|
+ }
|
|
|
|
+ if ((now - lastMsg) >= msgInterval) {
|
|
HMsg outboundArray[] = null;
|
|
HMsg outboundArray[] = null;
|
|
synchronized(outboundMsgs) {
|
|
synchronized(outboundMsgs) {
|
|
outboundArray =
|
|
outboundArray =
|
|
@@ -514,9 +526,10 @@ public class HRegionServer implements HConstants, HRegionInterface, Runnable {
|
|
stop();
|
|
stop();
|
|
}
|
|
}
|
|
}
|
|
}
|
|
- } // while (!stopRequested.get())
|
|
|
|
|
|
+ }
|
|
|
|
+
|
|
this.sleeper.sleep(lastMsg);
|
|
this.sleeper.sleep(lastMsg);
|
|
- }
|
|
|
|
|
|
+ } // while (!stopRequested.get())
|
|
}
|
|
}
|
|
} catch (Throwable t) {
|
|
} catch (Throwable t) {
|
|
LOG.fatal("Unhandled exception. Aborting...", t);
|
|
LOG.fatal("Unhandled exception. Aborting...", t);
|
|
@@ -743,12 +756,13 @@ public class HRegionServer implements HConstants, HRegionInterface, Runnable {
|
|
LOG.debug("Telling master we are up");
|
|
LOG.debug("Telling master we are up");
|
|
}
|
|
}
|
|
MapWritable result = null;
|
|
MapWritable result = null;
|
|
|
|
+ long lastMsg = 0;
|
|
while(!stopRequested.get()) {
|
|
while(!stopRequested.get()) {
|
|
- long lastMsg = 0;
|
|
|
|
try {
|
|
try {
|
|
this.requestCount.set(0);
|
|
this.requestCount.set(0);
|
|
this.serverInfo.setLoad(new HServerLoad(0, onlineRegions.size()));
|
|
this.serverInfo.setLoad(new HServerLoad(0, onlineRegions.size()));
|
|
result = this.hbaseMaster.regionServerStartup(serverInfo);
|
|
result = this.hbaseMaster.regionServerStartup(serverInfo);
|
|
|
|
+ lastMsg = System.currentTimeMillis();
|
|
if (LOG.isDebugEnabled()) {
|
|
if (LOG.isDebugEnabled()) {
|
|
LOG.debug("Done telling master we are up");
|
|
LOG.debug("Done telling master we are up");
|
|
}
|
|
}
|