|
@@ -23,6 +23,7 @@ import java.io.IOException;
|
|
|
import java.lang.Thread.UncaughtExceptionHandler;
|
|
|
import java.lang.reflect.Constructor;
|
|
|
import java.net.InetSocketAddress;
|
|
|
+import java.net.UnknownHostException;
|
|
|
import java.util.ArrayList;
|
|
|
import java.util.Collections;
|
|
|
import java.util.HashMap;
|
|
@@ -55,6 +56,7 @@ import org.apache.hadoop.hbase.util.Threads;
|
|
|
import org.apache.hadoop.hbase.util.Writables;
|
|
|
import org.apache.hadoop.io.MapWritable;
|
|
|
import org.apache.hadoop.io.Text;
|
|
|
+import org.apache.hadoop.io.Writable;
|
|
|
import org.apache.hadoop.ipc.RPC;
|
|
|
import org.apache.hadoop.ipc.Server;
|
|
|
import org.apache.hadoop.net.DNS;
|
|
@@ -67,22 +69,12 @@ import org.apache.hadoop.util.StringUtils;
|
|
|
public class HRegionServer implements HConstants, HRegionInterface, Runnable {
|
|
|
static final Log LOG = LogFactory.getLog(HRegionServer.class);
|
|
|
|
|
|
- /** {@inheritDoc} */
|
|
|
- public long getProtocolVersion(final String protocol,
|
|
|
- @SuppressWarnings("unused") final long clientVersion)
|
|
|
- throws IOException {
|
|
|
- if (protocol.equals(HRegionInterface.class.getName())) {
|
|
|
- return HRegionInterface.versionID;
|
|
|
- }
|
|
|
- throw new IOException("Unknown protocol to name node: " + protocol);
|
|
|
- }
|
|
|
-
|
|
|
// Set when a report to the master comes back with a message asking us to
|
|
|
// shutdown. Also set by call to stop when debugging or running unit tests
|
|
|
// of HRegionServer in isolation. We use AtomicBoolean rather than
|
|
|
// plain boolean so we can pass a reference to Chore threads. Otherwise,
|
|
|
// Chore threads need to know about the hosting class.
|
|
|
- protected AtomicBoolean stopRequested = new AtomicBoolean(false);
|
|
|
+ protected final AtomicBoolean stopRequested = new AtomicBoolean(false);
|
|
|
|
|
|
// Go down hard. Used if file system becomes unavailable and also in
|
|
|
// debugging and unit tests.
|
|
@@ -91,38 +83,35 @@ public class HRegionServer implements HConstants, HRegionInterface, Runnable {
|
|
|
// If false, the file system has become unavailable
|
|
|
protected volatile boolean fsOk;
|
|
|
|
|
|
- final Path rootDir;
|
|
|
protected final HServerInfo serverInfo;
|
|
|
protected final Configuration conf;
|
|
|
- private final Random rand;
|
|
|
+ private final Random rand = new Random();
|
|
|
|
|
|
// region name -> HRegion
|
|
|
- protected final SortedMap<Text, HRegion> onlineRegions;
|
|
|
+ protected final SortedMap<Text, HRegion> onlineRegions =
|
|
|
+ Collections.synchronizedSortedMap(new TreeMap<Text, HRegion>());
|
|
|
protected final Map<Text, HRegion> retiringRegions =
|
|
|
new HashMap<Text, HRegion>();
|
|
|
|
|
|
protected final ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
|
|
|
- private final Vector<HMsg> outboundMsgs;
|
|
|
+ private final Vector<HMsg> outboundMsgs = new Vector<HMsg>();
|
|
|
|
|
|
int numRetries;
|
|
|
protected final int threadWakeFrequency;
|
|
|
private final int msgInterval;
|
|
|
-
|
|
|
- // File paths
|
|
|
- private FileSystem fs;
|
|
|
|
|
|
// Remote HMaster
|
|
|
- private HMasterRegionInterface hbaseMaster;
|
|
|
+ private final HMasterRegionInterface hbaseMaster;
|
|
|
|
|
|
// Server to handle client requests. Default access so can be accessed by
|
|
|
// unit tests.
|
|
|
- Server server;
|
|
|
+ final Server server;
|
|
|
|
|
|
// Leases
|
|
|
- private Leases leases;
|
|
|
+ private final Leases leases;
|
|
|
|
|
|
// Request counter
|
|
|
- private AtomicInteger requestCount;
|
|
|
+ private final AtomicInteger requestCount = new AtomicInteger();
|
|
|
|
|
|
// A sleeper that sleeps for msgInterval.
|
|
|
private final Sleeper sleeper;
|
|
@@ -134,7 +123,7 @@ public class HRegionServer implements HConstants, HRegionInterface, Runnable {
|
|
|
// interrupted.
|
|
|
protected final Integer splitOrCompactLock = new Integer(0);
|
|
|
|
|
|
- /**
|
|
|
+ /*
|
|
|
* Runs periodically to determine if regions need to be compacted or split
|
|
|
*/
|
|
|
class SplitOrCompactChecker extends Chore
|
|
@@ -150,7 +139,6 @@ public class HRegionServer implements HConstants, HRegionInterface, Runnable {
|
|
|
30 * 1000), stop);
|
|
|
}
|
|
|
|
|
|
- /** {@inheritDoc} */
|
|
|
public void closing(final Text regionName) {
|
|
|
lock.writeLock().lock();
|
|
|
try {
|
|
@@ -166,7 +154,6 @@ public class HRegionServer implements HConstants, HRegionInterface, Runnable {
|
|
|
}
|
|
|
}
|
|
|
|
|
|
- /** {@inheritDoc} */
|
|
|
public void closed(final Text regionName) {
|
|
|
lock.writeLock().lock();
|
|
|
try {
|
|
@@ -290,7 +277,6 @@ public class HRegionServer implements HConstants, HRegionInterface, Runnable {
|
|
|
super(period, stop);
|
|
|
}
|
|
|
|
|
|
- /** {@inheritDoc} */
|
|
|
@Override
|
|
|
protected void chore() {
|
|
|
synchronized(cacheFlusherLock) {
|
|
@@ -326,8 +312,9 @@ public class HRegionServer implements HConstants, HRegionInterface, Runnable {
|
|
|
}
|
|
|
}
|
|
|
|
|
|
- // HLog and HLog roller.
|
|
|
- protected final HLog log;
|
|
|
+ // HLog and HLog roller. log is protected rather than private to avoid
|
|
|
+ // eclipse warning when accessed by inner classes
|
|
|
+ protected HLog log;
|
|
|
private final Thread logRollerThread;
|
|
|
protected final Integer logRollerLock = new Integer(0);
|
|
|
|
|
@@ -375,32 +362,21 @@ public class HRegionServer implements HConstants, HRegionInterface, Runnable {
|
|
|
* @throws IOException
|
|
|
*/
|
|
|
public HRegionServer(Configuration conf) throws IOException {
|
|
|
- this(new Path(conf.get(HBASE_DIR, DEFAULT_HBASE_DIR)),
|
|
|
- new HServerAddress(conf.get(REGIONSERVER_ADDRESS,
|
|
|
- DEFAULT_REGIONSERVER_ADDRESS)),
|
|
|
- conf);
|
|
|
+ this(new HServerAddress(conf.get(REGIONSERVER_ADDRESS,
|
|
|
+ DEFAULT_REGIONSERVER_ADDRESS)), conf);
|
|
|
}
|
|
|
|
|
|
/**
|
|
|
* Starts a HRegionServer at the specified location
|
|
|
- * @param rootDir
|
|
|
* @param address
|
|
|
* @param conf
|
|
|
* @throws IOException
|
|
|
*/
|
|
|
- public HRegionServer(Path rootDir, HServerAddress address,
|
|
|
- Configuration conf)
|
|
|
+ public HRegionServer(HServerAddress address, Configuration conf)
|
|
|
throws IOException {
|
|
|
this.abortRequested = false;
|
|
|
this.fsOk = true;
|
|
|
- this.rootDir = rootDir;
|
|
|
this.conf = conf;
|
|
|
- this.rand = new Random();
|
|
|
- this.onlineRegions =
|
|
|
- Collections.synchronizedSortedMap(new TreeMap<Text, HRegion>());
|
|
|
-
|
|
|
- this.outboundMsgs = new Vector<HMsg>();
|
|
|
- this.requestCount = new AtomicInteger();
|
|
|
|
|
|
// Config'ed params
|
|
|
this.numRetries = conf.getInt("hbase.client.retries.number", 2);
|
|
@@ -416,112 +392,26 @@ public class HRegionServer implements HConstants, HRegionInterface, Runnable {
|
|
|
new SplitOrCompactChecker(this.stopRequested);
|
|
|
|
|
|
// Task thread to process requests from Master
|
|
|
- this.toDo = new LinkedBlockingQueue<ToDoEntry>();
|
|
|
this.worker = new Worker();
|
|
|
this.workerThread = new Thread(worker);
|
|
|
this.sleeper = new Sleeper(this.msgInterval, this.stopRequested);
|
|
|
-
|
|
|
- try {
|
|
|
- // Server to handle client requests
|
|
|
- this.server = RPC.getServer(this, address.getBindAddress(),
|
|
|
- address.getPort(), conf.getInt("hbase.regionserver.handler.count", 10),
|
|
|
- false, conf);
|
|
|
-
|
|
|
- // Use interface to get the 'real' IP for this host.
|
|
|
- // 'serverInfo' is sent to master. Should have the real IP of this host
|
|
|
- // rather than 'localhost' or 0.0.0.0 or 127.0.0.1 in it.
|
|
|
- String realIP = DNS.getDefaultIP(
|
|
|
- conf.get("dfs.datanode.dns.interface","default"));
|
|
|
- this.serverInfo = new HServerInfo(new HServerAddress(
|
|
|
- new InetSocketAddress(realIP, server.getListenerAddress().getPort())),
|
|
|
- this.rand.nextLong());
|
|
|
- Path logdir = new Path(rootDir, "log" + "_" + realIP + "_" +
|
|
|
- this.serverInfo.getServerAddress().getPort());
|
|
|
- if (LOG.isDebugEnabled()) {
|
|
|
- LOG.debug("Log dir " + logdir);
|
|
|
- }
|
|
|
-
|
|
|
- // Logging
|
|
|
- this.fs = FileSystem.get(conf);
|
|
|
- if(fs.exists(logdir)) {
|
|
|
- throw new RegionServerRunningException("region server already " +
|
|
|
- "running at " + this.serverInfo.getServerAddress().toString() +
|
|
|
- " because logdir " + logdir.toString() + " exists");
|
|
|
- }
|
|
|
-
|
|
|
- this.log = new HLog(fs, logdir, conf);
|
|
|
- this.logRollerThread =
|
|
|
- new LogRoller(this.threadWakeFrequency, stopRequested);
|
|
|
-
|
|
|
- // Remote HMaster
|
|
|
- this.hbaseMaster = (HMasterRegionInterface)RPC.waitForProxy(
|
|
|
- HMasterRegionInterface.class, HMasterRegionInterface.versionID,
|
|
|
- new HServerAddress(conf.get(MASTER_ADDRESS)).getInetSocketAddress(),
|
|
|
- conf);
|
|
|
- } catch (IOException e) {
|
|
|
- this.stopRequested.set(true);
|
|
|
- throw RemoteExceptionHandler.checkIOException(e);
|
|
|
- }
|
|
|
- }
|
|
|
-
|
|
|
- /** @return the HLog */
|
|
|
- HLog getLog() {
|
|
|
- return log;
|
|
|
- }
|
|
|
-
|
|
|
- /**
|
|
|
- * Sets a flag that will cause all the HRegionServer threads to shut down
|
|
|
- * in an orderly fashion. Used by unit tests and called by {@link Flusher}
|
|
|
- * if it judges server needs to be restarted.
|
|
|
- */
|
|
|
- synchronized void stop() {
|
|
|
- this.stopRequested.set(true);
|
|
|
- notifyAll(); // Wakes run() if it is sleeping
|
|
|
- }
|
|
|
-
|
|
|
- /**
|
|
|
- * Cause the server to exit without closing the regions it is serving, the
|
|
|
- * log it is using and without notifying the master.
|
|
|
- * Used unit testing and on catastrophic events such as HDFS is yanked out
|
|
|
- * from under hbase or we OOME.
|
|
|
- */
|
|
|
- synchronized void abort() {
|
|
|
- this.abortRequested = true;
|
|
|
- stop();
|
|
|
- }
|
|
|
-
|
|
|
- /**
|
|
|
- * Wait on all threads to finish.
|
|
|
- * Presumption is that all closes and stops have already been called.
|
|
|
- */
|
|
|
- void join() {
|
|
|
- try {
|
|
|
- this.workerThread.join();
|
|
|
- } catch(InterruptedException iex) {
|
|
|
- // continue
|
|
|
- }
|
|
|
- try {
|
|
|
- this.logRollerThread.join();
|
|
|
- } catch(InterruptedException iex) {
|
|
|
- // continue
|
|
|
- }
|
|
|
- try {
|
|
|
- this.cacheFlusherThread.join();
|
|
|
- } catch(InterruptedException iex) {
|
|
|
- // continue
|
|
|
- }
|
|
|
- try {
|
|
|
- this.splitOrCompactCheckerThread.join();
|
|
|
- } catch(InterruptedException iex) {
|
|
|
- // continue
|
|
|
- }
|
|
|
- try {
|
|
|
- this.server.join();
|
|
|
- } catch(InterruptedException iex) {
|
|
|
- // continue
|
|
|
- }
|
|
|
- LOG.info("HRegionServer stopped at: " +
|
|
|
- serverInfo.getServerAddress().toString());
|
|
|
+ this.logRollerThread =
|
|
|
+ new LogRoller(this.threadWakeFrequency, stopRequested);
|
|
|
+ // Server to handle client requests
|
|
|
+ this.server = RPC.getServer(this, address.getBindAddress(),
|
|
|
+ address.getPort(), conf.getInt("hbase.regionserver.handler.count", 10),
|
|
|
+ false, conf);
|
|
|
+ this.serverInfo = new HServerInfo(new HServerAddress(
|
|
|
+ new InetSocketAddress(getThisIP(),
|
|
|
+ this.server.getListenerAddress().getPort())), this.rand.nextLong());
|
|
|
+ this.leases = new Leases(
|
|
|
+ conf.getInt("hbase.regionserver.lease.period", 3 * 60 * 1000),
|
|
|
+ this.threadWakeFrequency);
|
|
|
+ // Remote HMaster
|
|
|
+ this.hbaseMaster = (HMasterRegionInterface)RPC.waitForProxy(
|
|
|
+ HMasterRegionInterface.class, HMasterRegionInterface.versionID,
|
|
|
+ new HServerAddress(conf.get(MASTER_ADDRESS)).getInetSocketAddress(),
|
|
|
+ conf);
|
|
|
}
|
|
|
|
|
|
/**
|
|
@@ -530,21 +420,13 @@ public class HRegionServer implements HConstants, HRegionInterface, Runnable {
|
|
|
* load/unload instructions.
|
|
|
*/
|
|
|
public void run() {
|
|
|
- startAllServices();
|
|
|
-
|
|
|
// Set below if HMaster asked us stop.
|
|
|
boolean masterRequestedStop = false;
|
|
|
|
|
|
try {
|
|
|
+ init(reportForDuty());
|
|
|
while(!stopRequested.get()) {
|
|
|
long lastMsg = 0;
|
|
|
- try {
|
|
|
- reportForDuty();
|
|
|
- } catch(IOException e) {
|
|
|
- this.sleeper.sleep(lastMsg);
|
|
|
- continue;
|
|
|
- }
|
|
|
-
|
|
|
// Now ask master what it wants us to do and tell it what we have done
|
|
|
for (int tries = 0; !stopRequested.get();) {
|
|
|
if ((System.currentTimeMillis() - lastMsg) >= msgInterval) {
|
|
@@ -630,7 +512,7 @@ public class HRegionServer implements HConstants, HRegionInterface, Runnable {
|
|
|
LOG.fatal("Unhandled exception. Aborting...", t);
|
|
|
abort();
|
|
|
}
|
|
|
- leases.closeAfterLeasesExpire();
|
|
|
+ this.leases.closeAfterLeasesExpire();
|
|
|
this.worker.stop();
|
|
|
this.server.stop();
|
|
|
|
|
@@ -691,10 +573,52 @@ public class HRegionServer implements HConstants, HRegionInterface, Runnable {
|
|
|
serverInfo.getServerAddress().toString());
|
|
|
}
|
|
|
|
|
|
- join();
|
|
|
+ join();
|
|
|
LOG.info(Thread.currentThread().getName() + " exiting");
|
|
|
}
|
|
|
-
|
|
|
+
|
|
|
+ /*
|
|
|
+ * Run init. Sets up hlog and starts up all server threads.
|
|
|
+ * @param c Extra configuration.
|
|
|
+ */
|
|
|
+ private void init(final MapWritable c) {
|
|
|
+ try {
|
|
|
+ for (Map.Entry<Writable, Writable> e: c.entrySet()) {
|
|
|
+ String key = e.getKey().toString();
|
|
|
+ String value = e.getValue().toString();
|
|
|
+ if (LOG.isDebugEnabled()) {
|
|
|
+ LOG.debug("Config from master: " + key + "=" + value);
|
|
|
+ }
|
|
|
+ this.conf.set(key, value);
|
|
|
+ }
|
|
|
+ this.log = setupHLog();
|
|
|
+ startServiceThreads();
|
|
|
+ } catch (IOException e) {
|
|
|
+ this.stopRequested.set(true);
|
|
|
+ LOG.fatal("Failed init",
|
|
|
+ RemoteExceptionHandler.checkIOException(e));
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ private HLog setupHLog()
|
|
|
+ throws RegionServerRunningException, IOException {
|
|
|
+ String rootDir = this.conf.get(HConstants.HBASE_DIR);
|
|
|
+ LOG.info("Root dir: " + rootDir);
|
|
|
+ Path logdir = new Path(new Path(rootDir),
|
|
|
+ "log" + "_" + getThisIP() + "_" +
|
|
|
+ this.serverInfo.getServerAddress().getPort());
|
|
|
+ if (LOG.isDebugEnabled()) {
|
|
|
+ LOG.debug("Log dir " + logdir);
|
|
|
+ }
|
|
|
+ FileSystem fs = FileSystem.get(this.conf);
|
|
|
+ if (fs.exists(logdir)) {
|
|
|
+ throw new RegionServerRunningException("region server already " +
|
|
|
+ "running at " + this.serverInfo.getServerAddress().toString() +
|
|
|
+ " because logdir " + logdir.toString() + " exists");
|
|
|
+ }
|
|
|
+ return new HLog(fs, logdir, conf);
|
|
|
+ }
|
|
|
+
|
|
|
/*
|
|
|
* Start Chore Threads, Server, Worker and lease checker threads. Install an
|
|
|
* UncaughtExceptionHandler that calls abort of RegionServer if we get
|
|
@@ -707,7 +631,7 @@ public class HRegionServer implements HConstants, HRegionInterface, Runnable {
|
|
|
* Chore, it keeps its own internal stop mechanism so needs to be stopped
|
|
|
* by this hosting server. Worker logs the exception and exits.
|
|
|
*/
|
|
|
- private void startAllServices() {
|
|
|
+ private void startServiceThreads() throws IOException {
|
|
|
String n = Thread.currentThread().getName();
|
|
|
UncaughtExceptionHandler handler = new UncaughtExceptionHandler() {
|
|
|
public void uncaughtException(Thread t, Throwable e) {
|
|
@@ -728,41 +652,105 @@ public class HRegionServer implements HConstants, HRegionInterface, Runnable {
|
|
|
Threads.setDaemonThreadRunning(this.workerThread, n + ".worker", handler);
|
|
|
// Leases is not a Thread. Internally it runs a daemon thread. If it gets
|
|
|
// an unhandled exception, it will just exit.
|
|
|
- this.leases = new Leases(
|
|
|
- conf.getInt("hbase.regionserver.lease.period", 3 * 60 * 1000),
|
|
|
- this.threadWakeFrequency);
|
|
|
this.leases.setName(n + ".leaseChecker");
|
|
|
this.leases.start();
|
|
|
// Start Server. This service is like leases in that it internally runs
|
|
|
// a thread.
|
|
|
- try {
|
|
|
- this.server.start();
|
|
|
- LOG.info("HRegionServer started at: " +
|
|
|
+ this.server.start();
|
|
|
+ LOG.info("HRegionServer started at: " +
|
|
|
serverInfo.getServerAddress().toString());
|
|
|
- } catch(IOException e) {
|
|
|
- this.stopRequested.set(true);
|
|
|
- LOG.fatal("Failed start Server",
|
|
|
- RemoteExceptionHandler.checkIOException(e));
|
|
|
+ }
|
|
|
+
|
|
|
+ /** @return the HLog */
|
|
|
+ HLog getLog() {
|
|
|
+ return this.log;
|
|
|
+ }
|
|
|
+
|
|
|
+ /*
|
|
|
+ * Use interface to get the 'real' IP for this host. 'serverInfo' is sent to
|
|
|
+ * master. Should have the real IP of this host rather than 'localhost' or
|
|
|
+ * 0.0.0.0 or 127.0.0.1 in it.
|
|
|
+ * @return This servers' IP.
|
|
|
+ */
|
|
|
+ private String getThisIP() throws UnknownHostException {
|
|
|
+ return DNS.getDefaultIP(conf.get("dfs.datanode.dns.interface","default"));
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * Sets a flag that will cause all the HRegionServer threads to shut down
|
|
|
+ * in an orderly fashion. Used by unit tests and called by {@link Flusher}
|
|
|
+ * if it judges server needs to be restarted.
|
|
|
+ */
|
|
|
+ synchronized void stop() {
|
|
|
+ this.stopRequested.set(true);
|
|
|
+ notifyAll(); // Wakes run() if it is sleeping
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * Cause the server to exit without closing the regions it is serving, the
|
|
|
+ * log it is using and without notifying the master.
|
|
|
+ * Used unit testing and on catastrophic events such as HDFS is yanked out
|
|
|
+ * from under hbase or we OOME.
|
|
|
+ */
|
|
|
+ synchronized void abort() {
|
|
|
+ this.abortRequested = true;
|
|
|
+ stop();
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * Wait on all threads to finish.
|
|
|
+ * Presumption is that all closes and stops have already been called.
|
|
|
+ */
|
|
|
+ void join() {
|
|
|
+ join(this.workerThread);
|
|
|
+ join(this.logRollerThread);
|
|
|
+ join(this.cacheFlusherThread);
|
|
|
+ join(this.splitOrCompactCheckerThread);
|
|
|
+ try {
|
|
|
+ this.server.join();
|
|
|
+ } catch (InterruptedException e) {
|
|
|
+ // No means of asking server if its done... .so just assume it is even
|
|
|
+ // if an interrupt.
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ private void join(final Thread t) {
|
|
|
+ while (t.isAlive()) {
|
|
|
+ try {
|
|
|
+ t.join();
|
|
|
+ } catch (InterruptedException e) {
|
|
|
+ // continue
|
|
|
+ }
|
|
|
}
|
|
|
}
|
|
|
|
|
|
/*
|
|
|
* Let the master know we're here
|
|
|
- * @throws IOException
|
|
|
+ * Run initialization using parameters passed us by the master.
|
|
|
*/
|
|
|
- private void reportForDuty() throws IOException {
|
|
|
+ private MapWritable reportForDuty() {
|
|
|
if (LOG.isDebugEnabled()) {
|
|
|
LOG.debug("Telling master we are up");
|
|
|
}
|
|
|
- this.requestCount.set(0);
|
|
|
- this.serverInfo.setLoad(new HServerLoad(0, onlineRegions.size()));
|
|
|
- this.hbaseMaster.regionServerStartup(serverInfo);
|
|
|
- if (LOG.isDebugEnabled()) {
|
|
|
- LOG.debug("Done telling master we are up");
|
|
|
+ MapWritable result = null;
|
|
|
+ while(!stopRequested.get()) {
|
|
|
+ long lastMsg = 0;
|
|
|
+ try {
|
|
|
+ this.requestCount.set(0);
|
|
|
+ this.serverInfo.setLoad(new HServerLoad(0, onlineRegions.size()));
|
|
|
+ result = this.hbaseMaster.regionServerStartup(serverInfo);
|
|
|
+ if (LOG.isDebugEnabled()) {
|
|
|
+ LOG.debug("Done telling master we are up");
|
|
|
+ }
|
|
|
+ break;
|
|
|
+ } catch(IOException e) {
|
|
|
+ this.sleeper.sleep(lastMsg);
|
|
|
+ continue;
|
|
|
+ }
|
|
|
}
|
|
|
+ return result;
|
|
|
}
|
|
|
|
|
|
-
|
|
|
/** Add to the outbound message buffer */
|
|
|
private void reportOpen(HRegion region) {
|
|
|
synchronized(outboundMsgs) {
|
|
@@ -808,7 +796,7 @@ public class HRegionServer implements HConstants, HRegionInterface, Runnable {
|
|
|
this.msg = msg;
|
|
|
}
|
|
|
}
|
|
|
- BlockingQueue<ToDoEntry> toDo;
|
|
|
+ BlockingQueue<ToDoEntry> toDo = new LinkedBlockingQueue<ToDoEntry>();
|
|
|
private Worker worker;
|
|
|
private Thread workerThread;
|
|
|
|
|
@@ -886,7 +874,8 @@ public class HRegionServer implements HConstants, HRegionInterface, Runnable {
|
|
|
void openRegion(HRegionInfo regionInfo) throws IOException {
|
|
|
HRegion region = onlineRegions.get(regionInfo.regionName);
|
|
|
if(region == null) {
|
|
|
- region = new HRegion(rootDir, log, fs, conf, regionInfo, null);
|
|
|
+ region = new HRegion(new Path(this.conf.get(HConstants.HBASE_DIR)),
|
|
|
+ this.log, FileSystem.get(conf), conf, regionInfo, null);
|
|
|
this.lock.writeLock().lock();
|
|
|
try {
|
|
|
this.log.setSequenceNumber(region.getMaxSequenceId());
|
|
@@ -1275,7 +1264,13 @@ public class HRegionServer implements HConstants, HRegionInterface, Runnable {
|
|
|
*/
|
|
|
protected boolean checkFileSystem() {
|
|
|
if (this.fsOk) {
|
|
|
- if (!FSUtils.isFileSystemAvailable(fs)) {
|
|
|
+ FileSystem fs = null;
|
|
|
+ try {
|
|
|
+ fs = FileSystem.get(this.conf);
|
|
|
+ } catch (IOException e) {
|
|
|
+ LOG.error("Failed get of filesystem", e);
|
|
|
+ }
|
|
|
+ if (fs != null && !FSUtils.isFileSystemAvailable(fs, stopRequested)) {
|
|
|
LOG.fatal("Shutting down HRegionServer: file system not available");
|
|
|
this.abortRequested = true;
|
|
|
this.stopRequested.set(true);
|
|
@@ -1308,6 +1303,15 @@ public class HRegionServer implements HConstants, HRegionInterface, Runnable {
|
|
|
return regionsToCheck;
|
|
|
}
|
|
|
|
|
|
+ public long getProtocolVersion(final String protocol,
|
|
|
+ @SuppressWarnings("unused") final long clientVersion)
|
|
|
+ throws IOException {
|
|
|
+ if (protocol.equals(HRegionInterface.class.getName())) {
|
|
|
+ return HRegionInterface.versionID;
|
|
|
+ }
|
|
|
+ throw new IOException("Unknown protocol to name node: " + protocol);
|
|
|
+ }
|
|
|
+
|
|
|
//
|
|
|
// Main program and support routines
|
|
|
//
|