|
@@ -35,6 +35,8 @@ import java.util.TimerTask;
|
|
|
import java.util.TreeMap;
|
|
|
import java.util.TreeSet;
|
|
|
import java.util.concurrent.BlockingQueue;
|
|
|
+import java.util.concurrent.Delayed;
|
|
|
+import java.util.concurrent.DelayQueue;
|
|
|
import java.util.concurrent.LinkedBlockingQueue;
|
|
|
import java.util.concurrent.TimeUnit;
|
|
|
import java.util.concurrent.atomic.AtomicBoolean;
|
|
@@ -99,8 +101,10 @@ HMasterRegionInterface {
|
|
|
int numRetries;
|
|
|
long maxRegionOpenTime;
|
|
|
|
|
|
+ DelayQueue<PendingServerShutdown> shutdownQueue;
|
|
|
BlockingQueue<PendingOperation> msgQueue;
|
|
|
|
|
|
+ int leaseTimeout;
|
|
|
private Leases serverLeases;
|
|
|
private Server server;
|
|
|
private HServerAddress address;
|
|
@@ -860,11 +864,12 @@ HMasterRegionInterface {
|
|
|
this.numRetries = conf.getInt("hbase.client.retries.number", 2);
|
|
|
this.maxRegionOpenTime =
|
|
|
conf.getLong("hbase.hbasemaster.maxregionopen", 30 * 1000);
|
|
|
-
|
|
|
+
|
|
|
+ this.shutdownQueue = new DelayQueue<PendingServerShutdown>();
|
|
|
this.msgQueue = new LinkedBlockingQueue<PendingOperation>();
|
|
|
-
|
|
|
- this.serverLeases = new Leases(
|
|
|
- conf.getInt("hbase.master.lease.period", 30 * 1000),
|
|
|
+
|
|
|
+ this.leaseTimeout = conf.getInt("hbase.master.lease.period", 30 * 1000);
|
|
|
+ this.serverLeases = new Leases(this.leaseTimeout,
|
|
|
conf.getInt("hbase.master.lease.thread.wakefrequency", 15 * 1000));
|
|
|
|
|
|
this.server = RPC.getServer(this, address.getBindAddress(),
|
|
@@ -966,10 +971,13 @@ HMasterRegionInterface {
|
|
|
*/
|
|
|
try {
|
|
|
for (PendingOperation op = null; !closed.get(); ) {
|
|
|
- try {
|
|
|
- op = msgQueue.poll(threadWakeFrequency, TimeUnit.MILLISECONDS);
|
|
|
- } catch (InterruptedException e) {
|
|
|
- // continue
|
|
|
+ op = shutdownQueue.poll();
|
|
|
+ if (op == null ) {
|
|
|
+ try {
|
|
|
+ op = msgQueue.poll(threadWakeFrequency, TimeUnit.MILLISECONDS);
|
|
|
+ } catch (InterruptedException e) {
|
|
|
+ // continue
|
|
|
+ }
|
|
|
}
|
|
|
if (op == null || closed.get()) {
|
|
|
continue;
|
|
@@ -1117,6 +1125,7 @@ HMasterRegionInterface {
|
|
|
* HMasterRegionInterface
|
|
|
*/
|
|
|
|
|
|
+ /** {@inheritDoc} */
|
|
|
@SuppressWarnings("unused")
|
|
|
public MapWritable regionServerStartup(HServerInfo serverInfo)
|
|
|
throws IOException {
|
|
@@ -1140,11 +1149,7 @@ HMasterRegionInterface {
|
|
|
serversToServerInfo.notifyAll();
|
|
|
}
|
|
|
if (storedInfo != null && !closed.get()) {
|
|
|
- try {
|
|
|
- msgQueue.put(new PendingServerShutdown(storedInfo));
|
|
|
- } catch (InterruptedException e) {
|
|
|
- throw new RuntimeException("Putting into msgQueue was interrupted.", e);
|
|
|
- }
|
|
|
+ shutdownQueue.put(new PendingServerShutdown(storedInfo));
|
|
|
}
|
|
|
|
|
|
// Either way, record the new server
|
|
@@ -1683,9 +1688,12 @@ HMasterRegionInterface {
|
|
|
* The region server's log file needs to be split up for each region it was
|
|
|
* serving, and the regions need to get reassigned.
|
|
|
*/
|
|
|
- private class PendingServerShutdown extends PendingOperation {
|
|
|
+ private class PendingServerShutdown extends PendingOperation
|
|
|
+ implements Delayed {
|
|
|
+ private long delay;
|
|
|
private HServerAddress deadServer;
|
|
|
private String deadServerName;
|
|
|
+ private Path oldLogDir;
|
|
|
private transient boolean logSplit;
|
|
|
private transient boolean rootChecked;
|
|
|
private transient boolean rootRescanned;
|
|
@@ -1706,13 +1714,32 @@ HMasterRegionInterface {
|
|
|
|
|
|
PendingServerShutdown(HServerInfo serverInfo) {
|
|
|
super();
|
|
|
+ this.delay = leaseTimeout / 2;
|
|
|
this.deadServer = serverInfo.getServerAddress();
|
|
|
this.deadServerName = this.deadServer.toString();
|
|
|
this.logSplit = false;
|
|
|
this.rootChecked = false;
|
|
|
this.rootRescanned = false;
|
|
|
+ StringBuilder dirName = new StringBuilder("log_");
|
|
|
+ dirName.append(deadServer.getBindAddress());
|
|
|
+ dirName.append("_");
|
|
|
+ dirName.append(serverInfo.getStartCode());
|
|
|
+ dirName.append("_");
|
|
|
+ dirName.append(deadServer.getPort());
|
|
|
+ this.oldLogDir = new Path(dir, dirName.toString());
|
|
|
}
|
|
|
|
|
|
+ /** {@inheritDoc} */
|
|
|
+ public long getDelay(TimeUnit unit) {
|
|
|
+ return unit.convert(delay, TimeUnit.MILLISECONDS);
|
|
|
+ }
|
|
|
+
|
|
|
+ /** {@inheritDoc} */
|
|
|
+ public int compareTo(Delayed o) {
|
|
|
+ return Long.valueOf(getDelay(TimeUnit.MILLISECONDS)
|
|
|
+ - o.getDelay(TimeUnit.MILLISECONDS)).intValue();
|
|
|
+ }
|
|
|
+
|
|
|
/** {@inheritDoc} */
|
|
|
@Override
|
|
|
public String toString() {
|
|
@@ -1875,17 +1902,12 @@ HMasterRegionInterface {
|
|
|
|
|
|
if (!logSplit) {
|
|
|
// Process the old log file
|
|
|
- StringBuilder dirName = new StringBuilder("log_");
|
|
|
- dirName.append(deadServer.getBindAddress());
|
|
|
- dirName.append("_");
|
|
|
- dirName.append(deadServer.getPort());
|
|
|
- Path logdir = new Path(dir, dirName.toString());
|
|
|
- if (fs.exists(logdir)) {
|
|
|
+ if (fs.exists(oldLogDir)) {
|
|
|
if (!splitLogLock.tryLock()) {
|
|
|
return false;
|
|
|
}
|
|
|
try {
|
|
|
- HLog.splitLog(dir, logdir, fs, conf);
|
|
|
+ HLog.splitLog(dir, oldLogDir, fs, conf);
|
|
|
} finally {
|
|
|
splitLogLock.unlock();
|
|
|
}
|
|
@@ -2901,16 +2923,8 @@ HMasterRegionInterface {
|
|
|
// NOTE: If the server was serving the root region, we cannot reassign it
|
|
|
// here because the new server will start serving the root region before
|
|
|
// the PendingServerShutdown operation has a chance to split the log file.
|
|
|
- try {
|
|
|
- if (info != null) {
|
|
|
- msgQueue.put(new PendingServerShutdown(info));
|
|
|
- }
|
|
|
- } catch (InterruptedException e) {
|
|
|
- // continue. We used to throw a RuntimeException here but on exit
|
|
|
- // this put is often interrupted. For now, just log these iterrupts
|
|
|
- // rather than throw an exception
|
|
|
- LOG.debug("MsgQueue.put was interrupted (If we are exiting, this " +
|
|
|
- "msg can be ignored)");
|
|
|
+ if (info != null) {
|
|
|
+ shutdownQueue.put(new PendingServerShutdown(info));
|
|
|
}
|
|
|
}
|
|
|
}
|