|
@@ -21,7 +21,6 @@ package org.apache.hadoop.hbase;
|
|
|
|
|
|
import java.io.File;
|
|
|
import java.io.IOException;
|
|
|
-import java.util.ArrayList;
|
|
|
import java.util.List;
|
|
|
|
|
|
import org.apache.hadoop.conf.Configuration;
|
|
@@ -31,28 +30,19 @@ import org.apache.hadoop.fs.Path;
|
|
|
import org.apache.log4j.Logger;
|
|
|
|
|
|
/**
|
|
|
- * This class creates a single process HBase cluster for junit testing.
|
|
|
- * One thread is created for each server.
|
|
|
- *
|
|
|
- * <p>TestCases do not need to subclass to start a HBaseCluster. Call
|
|
|
- * {@link #startMaster(Configuration)} and
|
|
|
- * {@link #startRegionServers(Configuration, int)} to startup master and
|
|
|
- * region servers. Save off the returned values and pass them to
|
|
|
- * {@link #shutdown(org.apache.hadoop.hbase.MiniHBaseCluster.MasterThread, List)}
|
|
|
- * to shut it all down when done.
|
|
|
- *
|
|
|
+ * This class creates a single process HBase cluster. One thread is created for
|
|
|
+ * each server.
|
|
|
*/
|
|
|
public class MiniHBaseCluster implements HConstants {
|
|
|
static final Logger LOG =
|
|
|
Logger.getLogger(MiniHBaseCluster.class.getName());
|
|
|
+
|
|
|
private Configuration conf;
|
|
|
private MiniDFSCluster cluster;
|
|
|
private FileSystem fs;
|
|
|
private boolean shutdownDFS;
|
|
|
private Path parentdir;
|
|
|
- private MasterThread masterThread = null;
|
|
|
- List<RegionServerThread> regionThreads =
|
|
|
- java.util.Collections.synchronizedList(new ArrayList<RegionServerThread>());
|
|
|
+ private LocalHBaseCluster hbaseCluster;
|
|
|
private boolean deleteOnExit = true;
|
|
|
|
|
|
/**
|
|
@@ -63,8 +53,7 @@ public class MiniHBaseCluster implements HConstants {
|
|
|
* @throws IOException
|
|
|
*/
|
|
|
public MiniHBaseCluster(Configuration conf, int nRegionNodes)
|
|
|
- throws IOException {
|
|
|
-
|
|
|
+ throws IOException {
|
|
|
this(conf, nRegionNodes, true, true, true);
|
|
|
}
|
|
|
|
|
@@ -143,108 +132,14 @@ public class MiniHBaseCluster implements HConstants {
|
|
|
try {
|
|
|
this.parentdir = new Path(conf.get(HBASE_DIR, DEFAULT_HBASE_DIR));
|
|
|
fs.mkdirs(parentdir);
|
|
|
- this.masterThread = startMaster(this.conf);
|
|
|
- this.regionThreads.addAll(startRegionServers(this.conf, nRegionNodes));
|
|
|
+ this.hbaseCluster = new LocalHBaseCluster(this.conf, nRegionNodes);
|
|
|
+ this.hbaseCluster.startup();
|
|
|
} catch(IOException e) {
|
|
|
shutdown();
|
|
|
throw e;
|
|
|
}
|
|
|
}
|
|
|
|
|
|
- /** runs the master server */
|
|
|
- public static class MasterThread extends Thread {
|
|
|
- private final HMaster master;
|
|
|
- MasterThread(final HMaster m) {
|
|
|
- super(m, "Master:" + m.getMasterAddress().toString());
|
|
|
- this.master = m;
|
|
|
- }
|
|
|
-
|
|
|
- /** {@inheritDoc} */
|
|
|
- @Override
|
|
|
- public void run() {
|
|
|
- LOG.info("Starting " + getName());
|
|
|
- super.run();
|
|
|
- }
|
|
|
-
|
|
|
- /** @return master server */
|
|
|
- public HMaster getMaster() {
|
|
|
- return this.master;
|
|
|
- }
|
|
|
- }
|
|
|
-
|
|
|
- /** runs region servers */
|
|
|
- public static class RegionServerThread extends Thread {
|
|
|
- private final HRegionServer regionServer;
|
|
|
- RegionServerThread(final HRegionServer r, final int index) {
|
|
|
- super(r, "RegionServer:" + index);
|
|
|
- this.regionServer = r;
|
|
|
- }
|
|
|
-
|
|
|
- @Override
|
|
|
- public void run() {
|
|
|
- LOG.info("Starting " + getName());
|
|
|
- super.run();
|
|
|
- }
|
|
|
-
|
|
|
- /** @return the region server */
|
|
|
- public HRegionServer getRegionServer() {
|
|
|
- return this.regionServer;
|
|
|
- }
|
|
|
- }
|
|
|
-
|
|
|
- /**
|
|
|
- * Use this method to start a master.
|
|
|
- * If you want to start an hbase cluster
|
|
|
- * without subclassing this test case, run this method and
|
|
|
- * {@link #startRegionServers(Configuration, int)} to start servers.
|
|
|
- * Call {@link #shutdown(org.apache.hadoop.hbase.MiniHBaseCluster.MasterThread, List)}
|
|
|
- * to shut them down.
|
|
|
- * @param c
|
|
|
- * @return Thread running the master.
|
|
|
- * @throws IOException
|
|
|
- * @see #startRegionServers(Configuration, int)
|
|
|
- * @see #shutdown(org.apache.hadoop.hbase.MiniHBaseCluster.MasterThread, List)
|
|
|
- */
|
|
|
- public static MasterThread startMaster(final Configuration c)
|
|
|
- throws IOException {
|
|
|
-
|
|
|
- if(c.get(MASTER_ADDRESS) == null) {
|
|
|
- c.set(MASTER_ADDRESS, "localhost:0");
|
|
|
- }
|
|
|
- // Create the master
|
|
|
- final HMaster m = new HMaster(c);
|
|
|
- MasterThread masterThread = new MasterThread(m);
|
|
|
- // Start up the master
|
|
|
- masterThread.start();
|
|
|
- // Set the master's port for the HRegionServers
|
|
|
- c.set(MASTER_ADDRESS, m.getMasterAddress().toString());
|
|
|
- return masterThread;
|
|
|
- }
|
|
|
-
|
|
|
- /**
|
|
|
- * @param c
|
|
|
- * @param count
|
|
|
- * @return List of region server threads started. Synchronize on the
|
|
|
- * returned list when iterating to avoid ConcurrentModificationExceptions.
|
|
|
- * @throws IOException
|
|
|
- * @see #startMaster(Configuration)
|
|
|
- */
|
|
|
- public static ArrayList<RegionServerThread> startRegionServers(
|
|
|
- final Configuration c, final int count) throws IOException {
|
|
|
-
|
|
|
- // Start the HRegionServers. Always have regionservers come up on
|
|
|
- // port '0' so there won't be clashes over default port as unit tests
|
|
|
- // start/stop ports at different times during the life of the test.
|
|
|
- c.set(REGIONSERVER_ADDRESS, DEFAULT_HOST + ":0");
|
|
|
- LOG.info("Starting HRegionServers");
|
|
|
- ArrayList<RegionServerThread> threads =
|
|
|
- new ArrayList<RegionServerThread>();
|
|
|
- for(int i = 0; i < count; i++) {
|
|
|
- threads.add(startRegionServer(c, i));
|
|
|
- }
|
|
|
- return threads;
|
|
|
- }
|
|
|
-
|
|
|
/**
|
|
|
* Starts a region server thread running
|
|
|
*
|
|
@@ -252,21 +147,10 @@ public class MiniHBaseCluster implements HConstants {
|
|
|
* @return Name of regionserver started.
|
|
|
*/
|
|
|
public String startRegionServer() throws IOException {
|
|
|
- RegionServerThread t =
|
|
|
- startRegionServer(this.conf, this.regionThreads.size());
|
|
|
- this.regionThreads.add(t);
|
|
|
- return t.getName();
|
|
|
- }
|
|
|
-
|
|
|
- private static RegionServerThread startRegionServer(final Configuration c,
|
|
|
- final int index)
|
|
|
- throws IOException {
|
|
|
- final HRegionServer hrs = new HRegionServer(c);
|
|
|
- RegionServerThread t = new RegionServerThread(hrs, index);
|
|
|
- t.setName("regionserver" +
|
|
|
- t.getRegionServer().server.getListenerAddress().toString());
|
|
|
+ LocalHBaseCluster.RegionServerThread t =
|
|
|
+ this.hbaseCluster.addRegionServer();
|
|
|
t.start();
|
|
|
- return t;
|
|
|
+ return t.getName();
|
|
|
}
|
|
|
|
|
|
/**
|
|
@@ -283,24 +167,24 @@ public class MiniHBaseCluster implements HConstants {
|
|
|
* the supplied port is not necessarily the actual port used.
|
|
|
*/
|
|
|
public HServerAddress getHMasterAddress() {
|
|
|
- return this.masterThread.getMaster().getMasterAddress();
|
|
|
+ return this.hbaseCluster.getMaster().getMasterAddress();
|
|
|
}
|
|
|
|
|
|
/**
|
|
|
- * @return the thread running the HMaster
|
|
|
+ * @return the HMaster
|
|
|
*/
|
|
|
- public MasterThread getMasterThread() {
|
|
|
- return this.masterThread;
|
|
|
+ public HMaster getMaster() {
|
|
|
+ return this.hbaseCluster.getMaster();
|
|
|
}
|
|
|
|
|
|
/**
|
|
|
* Cause a region server to exit without cleaning up
|
|
|
*
|
|
|
- * @param serverNumber
|
|
|
+ * @param serverNumber Used as index into a list.
|
|
|
*/
|
|
|
public void abortRegionServer(int serverNumber) {
|
|
|
HRegionServer server =
|
|
|
- this.regionThreads.get(serverNumber).getRegionServer();
|
|
|
+ this.hbaseCluster.getRegionServers().get(serverNumber).getRegionServer();
|
|
|
LOG.info("Aborting " + server.serverInfo.toString());
|
|
|
server.abort();
|
|
|
}
|
|
@@ -308,12 +192,12 @@ public class MiniHBaseCluster implements HConstants {
|
|
|
/**
|
|
|
* Shut down the specified region server cleanly
|
|
|
*
|
|
|
- * @param serverNumber
|
|
|
+ * @param serverNumber Used as index into a list.
|
|
|
* @return the region server that was stopped
|
|
|
*/
|
|
|
public HRegionServer stopRegionServer(int serverNumber) {
|
|
|
HRegionServer server =
|
|
|
- this.regionThreads.get(serverNumber).getRegionServer();
|
|
|
+ this.hbaseCluster.getRegionServers().get(serverNumber).getRegionServer();
|
|
|
LOG.info("Stopping " + server.toString());
|
|
|
server.stop();
|
|
|
return server;
|
|
@@ -325,99 +209,28 @@ public class MiniHBaseCluster implements HConstants {
|
|
|
* @param serverNumber
|
|
|
* @return Name of region server that just went down.
|
|
|
*/
|
|
|
- public String waitOnRegionServer(int serverNumber) {
|
|
|
- RegionServerThread regionServerThread =
|
|
|
- this.regionThreads.remove(serverNumber);
|
|
|
- try {
|
|
|
- LOG.info("Waiting on " +
|
|
|
- regionServerThread.getRegionServer().serverInfo.toString());
|
|
|
- regionServerThread.join();
|
|
|
- } catch (InterruptedException e) {
|
|
|
- e.printStackTrace();
|
|
|
- }
|
|
|
- return regionServerThread.getName();
|
|
|
+ public String waitOnRegionServer(final int serverNumber) {
|
|
|
+ return this.hbaseCluster.waitOnRegionServer(serverNumber);
|
|
|
}
|
|
|
|
|
|
/**
|
|
|
* Wait for Mini HBase Cluster to shut down.
|
|
|
*/
|
|
|
public void join() {
|
|
|
- if (regionThreads != null) {
|
|
|
- synchronized(regionThreads) {
|
|
|
- for(Thread t: regionThreads) {
|
|
|
- if (t.isAlive()) {
|
|
|
- try {
|
|
|
- t.join();
|
|
|
- } catch (InterruptedException e) {
|
|
|
- // continue
|
|
|
- }
|
|
|
- }
|
|
|
- }
|
|
|
- }
|
|
|
- }
|
|
|
- if (masterThread != null && masterThread.isAlive()) {
|
|
|
- try {
|
|
|
- masterThread.join();
|
|
|
- } catch(InterruptedException e) {
|
|
|
- // continue
|
|
|
- }
|
|
|
- }
|
|
|
- }
|
|
|
-
|
|
|
- /**
|
|
|
- * Shut down HBase cluster started by calling
|
|
|
- * {@link #startMaster(Configuration)} and then
|
|
|
- * {@link #startRegionServers(Configuration, int)};
|
|
|
- * @param masterThread
|
|
|
- * @param regionServerThreads
|
|
|
- */
|
|
|
- public static void shutdown(final MasterThread masterThread,
|
|
|
- final List<RegionServerThread> regionServerThreads) {
|
|
|
- LOG.info("Shutting down HBase Cluster");
|
|
|
- /** This is not needed. Remove.
|
|
|
- for(RegionServerThread hsr: regionServerThreads) {
|
|
|
- hsr.getRegionServer().stop();
|
|
|
- }
|
|
|
- */
|
|
|
- if(masterThread != null) {
|
|
|
- masterThread.getMaster().shutdown();
|
|
|
- }
|
|
|
- // regionServerThreads can never be null because they are initialized when
|
|
|
- // the class is constructed.
|
|
|
- synchronized(regionServerThreads) {
|
|
|
- for(Thread t: regionServerThreads) {
|
|
|
- if (t.isAlive()) {
|
|
|
- try {
|
|
|
- t.join();
|
|
|
- } catch (InterruptedException e) {
|
|
|
- // continue
|
|
|
- }
|
|
|
- }
|
|
|
- }
|
|
|
- }
|
|
|
- if (masterThread != null) {
|
|
|
- try {
|
|
|
- masterThread.join();
|
|
|
- } catch(InterruptedException e) {
|
|
|
- // continue
|
|
|
- }
|
|
|
- }
|
|
|
- LOG.info("Shutdown " +
|
|
|
- ((masterThread != null)? masterThread.getName(): "0 masters") + " " +
|
|
|
- regionServerThreads.size() + " region server(s)");
|
|
|
+ this.hbaseCluster.join();
|
|
|
}
|
|
|
|
|
|
/**
|
|
|
* Shut down the mini HBase cluster
|
|
|
*/
|
|
|
public void shutdown() {
|
|
|
- MiniHBaseCluster.shutdown(this.masterThread, this.regionThreads);
|
|
|
+ this.hbaseCluster.shutdown();
|
|
|
|
|
|
try {
|
|
|
if (shutdownDFS && cluster != null) {
|
|
|
FileSystem fs = cluster.getFileSystem();
|
|
|
|
|
|
- LOG.info("Shutting down Mini DFS cluster");
|
|
|
+ LOG.info("Shutting down Mini DFS ");
|
|
|
cluster.shutdown();
|
|
|
|
|
|
if (fs != null) {
|
|
@@ -454,13 +267,18 @@ public class MiniHBaseCluster implements HConstants {
|
|
|
* @throws IOException
|
|
|
*/
|
|
|
void flushcache() throws IOException {
|
|
|
- HRegionServer s = this.regionThreads.get(0).getRegionServer();
|
|
|
- for(HRegion r: s.onlineRegions.values() ) {
|
|
|
- r.flushcache(false);
|
|
|
+ for (LocalHBaseCluster.RegionServerThread t:
|
|
|
+ this.hbaseCluster.getRegionServers()) {
|
|
|
+ for(HRegion r: t.getRegionServer().onlineRegions.values() ) {
|
|
|
+ r.flushcache(false);
|
|
|
+ }
|
|
|
}
|
|
|
}
|
|
|
|
|
|
- public List<RegionServerThread> getRegionThreads() {
|
|
|
- return this.regionThreads;
|
|
|
+ /**
|
|
|
+ * @return List of region server threads.
|
|
|
+ */
|
|
|
+ public List<LocalHBaseCluster.RegionServerThread> getRegionThreads() {
|
|
|
+ return this.hbaseCluster.getRegionServers();
|
|
|
}
|
|
|
}
|