|
@@ -95,11 +95,10 @@ public class HConnectionManager implements HConstants {
|
|
|
}
|
|
|
}
|
|
|
|
|
|
- /* encapsulates finding the servers for an HBase instance */
|
|
|
+ /* Encapsulates finding the servers for an HBase instance */
|
|
|
private static class TableServers implements HConnection, HConstants {
|
|
|
private static final Log LOG = LogFactory.getLog(TableServers.class);
|
|
|
private final Class<? extends HRegionInterface> serverInterfaceClass;
|
|
|
- private final long threadWakeFrequency;
|
|
|
private final long pause;
|
|
|
private final int numRetries;
|
|
|
|
|
@@ -110,21 +109,20 @@ public class HConnectionManager implements HConstants {
|
|
|
|
|
|
private final Integer rootRegionLock = new Integer(0);
|
|
|
private final Integer metaRegionLock = new Integer(0);
|
|
|
-
|
|
|
+ private final Integer userRegionLock = new Integer(0);
|
|
|
+
|
|
|
private volatile HBaseConfiguration conf;
|
|
|
|
|
|
- // Map tableName -> (Map startRow -> (HRegionInfo, HServerAddress)
|
|
|
- private Map<Text, SortedMap<Text, HRegionLocation>> tablesToServers;
|
|
|
-
|
|
|
// Set of closed tables
|
|
|
private Set<Text> closedTables;
|
|
|
|
|
|
- // Set of tables currently being located
|
|
|
- private Set<Text> tablesBeingLocated;
|
|
|
-
|
|
|
// Known region HServerAddress.toString() -> HRegionInterface
|
|
|
private Map<String, HRegionInterface> servers;
|
|
|
|
|
|
+ private HRegionLocation rootRegionLocation;
|
|
|
+
|
|
|
+ private Map<Text, SortedMap<Text, HRegionLocation>> cachedRegionLocations;
|
|
|
+
|
|
|
/**
|
|
|
* constructor
|
|
|
* @param conf Configuration object
|
|
@@ -147,20 +145,15 @@ public class HConnectionManager implements HConstants {
|
|
|
"Unable to find region server interface " + serverClassName, e);
|
|
|
}
|
|
|
|
|
|
- this.threadWakeFrequency = conf.getLong(THREAD_WAKE_FREQUENCY, 10 * 1000);
|
|
|
this.pause = conf.getLong("hbase.client.pause", 30 * 1000);
|
|
|
this.numRetries = conf.getInt("hbase.client.retries.number", 5);
|
|
|
|
|
|
this.master = null;
|
|
|
this.masterChecked = false;
|
|
|
|
|
|
- this.tablesToServers =
|
|
|
+ this.cachedRegionLocations =
|
|
|
new ConcurrentHashMap<Text, SortedMap<Text, HRegionLocation>>();
|
|
|
-
|
|
|
this.closedTables = Collections.synchronizedSet(new HashSet<Text>());
|
|
|
- this.tablesBeingLocated = Collections.synchronizedSet(
|
|
|
- new HashSet<Text>());
|
|
|
-
|
|
|
this.servers = new ConcurrentHashMap<String, HRegionInterface>();
|
|
|
}
|
|
|
|
|
@@ -246,18 +239,30 @@ public class HConnectionManager implements HConstants {
|
|
|
/** {@inheritDoc} */
|
|
|
public HTableDescriptor[] listTables() throws IOException {
|
|
|
HashSet<HTableDescriptor> uniqueTables = new HashSet<HTableDescriptor>();
|
|
|
+ long scannerId = -1L;
|
|
|
+ HRegionInterface server = null;
|
|
|
+
|
|
|
+ Text startRow = EMPTY_START_ROW;
|
|
|
+ HRegionLocation metaLocation = null;
|
|
|
|
|
|
- SortedMap<Text, HRegionLocation> metaTables =
|
|
|
- getTableServers(META_TABLE_NAME);
|
|
|
-
|
|
|
- for (HRegionLocation t: metaTables.values()) {
|
|
|
- HRegionInterface server = getHRegionConnection(t.getServerAddress());
|
|
|
- long scannerId = -1L;
|
|
|
- try {
|
|
|
- scannerId = server.openScanner(t.getRegionInfo().getRegionName(),
|
|
|
- COLUMN_FAMILY_ARRAY, EMPTY_START_ROW, System.currentTimeMillis(),
|
|
|
- null);
|
|
|
-
|
|
|
+ // scan over the each meta region
|
|
|
+ do {
|
|
|
+ try{
|
|
|
+ // turn the start row into a location
|
|
|
+ metaLocation =
|
|
|
+ locateRegion(META_TABLE_NAME, startRow);
|
|
|
+
|
|
|
+ // connect to the server hosting the .META. region
|
|
|
+ server =
|
|
|
+ getHRegionConnection(metaLocation.getServerAddress());
|
|
|
+
|
|
|
+ // open a scanner over the meta region
|
|
|
+ scannerId = server.openScanner(
|
|
|
+ metaLocation.getRegionInfo().getRegionName(),
|
|
|
+ COLUMN_FAMILY_ARRAY, EMPTY_START_ROW, LATEST_TIMESTAMP,
|
|
|
+ null);
|
|
|
+
|
|
|
+ // iterate through the scanner, accumulating unique table names
|
|
|
while (true) {
|
|
|
HbaseMapWritable values = server.next(scannerId);
|
|
|
if (values == null || values.size() == 0) {
|
|
@@ -277,78 +282,330 @@ public class HConnectionManager implements HConstants {
|
|
|
}
|
|
|
}
|
|
|
}
|
|
|
- } catch (RemoteException ex) {
|
|
|
- throw RemoteExceptionHandler.decodeRemoteException(ex);
|
|
|
-
|
|
|
- } finally {
|
|
|
+
|
|
|
+ server.close(scannerId);
|
|
|
+ scannerId = -1L;
|
|
|
+
|
|
|
+ // advance the startRow to the end key of the current region
|
|
|
+ startRow = metaLocation.getRegionInfo().getEndKey();
|
|
|
+ } catch (IOException e) {
|
|
|
+ // need retry logic?
|
|
|
+ throw e;
|
|
|
+ }
|
|
|
+ finally {
|
|
|
if (scannerId != -1L) {
|
|
|
server.close(scannerId);
|
|
|
}
|
|
|
}
|
|
|
- }
|
|
|
+ } while (startRow.compareTo(EMPTY_START_ROW) != 0);
|
|
|
+
|
|
|
return uniqueTables.toArray(new HTableDescriptor[uniqueTables.size()]);
|
|
|
}
|
|
|
|
|
|
- /** {@inheritDoc} */
|
|
|
- public SortedMap<Text, HRegionLocation> getTableServers(Text tableName)
|
|
|
- throws IOException {
|
|
|
-
|
|
|
+ public HRegionLocation locateRegion(Text tableName, Text row)
|
|
|
+ throws IOException{
|
|
|
+ return locateRegion(tableName, row, true);
|
|
|
+ }
|
|
|
+
|
|
|
+ public HRegionLocation relocateRegion(Text tableName, Text row)
|
|
|
+ throws IOException{
|
|
|
+ return locateRegion(tableName, row, false);
|
|
|
+ }
|
|
|
+
|
|
|
+ private HRegionLocation locateRegion(Text tableName, Text row,
|
|
|
+ boolean useCache)
|
|
|
+ throws IOException{
|
|
|
if (tableName == null || tableName.getLength() == 0) {
|
|
|
throw new IllegalArgumentException(
|
|
|
"table name cannot be null or zero length");
|
|
|
}
|
|
|
+
|
|
|
+ if (tableName.equals(ROOT_TABLE_NAME)) {
|
|
|
+ synchronized (rootRegionLock) {
|
|
|
+ // This block guards against two threads trying to find the root
|
|
|
+ // region at the same time. One will go do the find while the
|
|
|
+ // second waits. The second thread will not do find.
|
|
|
+
|
|
|
+ if (!useCache || rootRegionLocation == null) {
|
|
|
+ return locateRootRegion();
|
|
|
+ }
|
|
|
+ return rootRegionLocation;
|
|
|
+ }
|
|
|
+ } else if (tableName.equals(META_TABLE_NAME)) {
|
|
|
+ synchronized (metaRegionLock) {
|
|
|
+ // This block guards against two threads trying to load the meta
|
|
|
+ // region at the same time. The first will load the meta region and
|
|
|
+ // the second will use the value that the first one found.
|
|
|
+
|
|
|
+ return locateRegionInMeta(ROOT_TABLE_NAME, tableName, row, useCache);
|
|
|
+ }
|
|
|
+ } else {
|
|
|
+ synchronized(userRegionLock){
|
|
|
+ return locateRegionInMeta(META_TABLE_NAME, tableName, row, useCache);
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
|
|
|
- closedTables.remove(tableName);
|
|
|
+ /**
|
|
|
+ * Convenience method for turning a MapWritable into the underlying
|
|
|
+ * SortedMap we all know and love.
|
|
|
+ */
|
|
|
+ private SortedMap<Text, byte[]> sortedMapFromMapWritable(
|
|
|
+ HbaseMapWritable writable) {
|
|
|
+ SortedMap<Text, byte[]> results = new TreeMap<Text, byte[]>();
|
|
|
+ for (Map.Entry<Writable, Writable> e: writable.entrySet()) {
|
|
|
+ HStoreKey key = (HStoreKey) e.getKey();
|
|
|
+ results.put(key.getColumn(),
|
|
|
+ ((ImmutableBytesWritable) e.getValue()).get());
|
|
|
+ }
|
|
|
|
|
|
- SortedMap<Text, HRegionLocation> tableServers =
|
|
|
- tablesToServers.get(tableName);
|
|
|
+ return results;
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * Search one of the meta tables (-ROOT- or .META.) for the HRegionLocation
|
|
|
+ * info that contains the table and row we're seeking.
|
|
|
+ */
|
|
|
+ private HRegionLocation locateRegionInMeta(Text parentTable,
|
|
|
+ Text tableName, Text row, boolean useCache)
|
|
|
+ throws IOException{
|
|
|
+ HRegionLocation location = null;
|
|
|
|
|
|
- if (tableServers == null ) {
|
|
|
- if (LOG.isDebugEnabled()) {
|
|
|
- LOG.debug("No servers for " + tableName + ". Doing a find...");
|
|
|
+ // if we're supposed to be using the cache, then check it for a possible
|
|
|
+ // hit. otherwise, delete any existing cached location so it won't
|
|
|
+ // interfere.
|
|
|
+ if (useCache) {
|
|
|
+ location = getCachedLocation(tableName, row);
|
|
|
+ if (location != null) {
|
|
|
+ LOG.debug("Looking in " + parentTable + " for "
|
|
|
+ + tableName + "/" + row
|
|
|
+ + ", got a cache hit with "
|
|
|
+ + location.getRegionInfo().getRegionName());
|
|
|
+ return location;
|
|
|
+ }
|
|
|
+ } else{
|
|
|
+ deleteCachedLocation(tableName, row);
|
|
|
+ }
|
|
|
+
|
|
|
+ // build the key of the meta region we should be looking for.
|
|
|
+ // the extra 9's on the end are necessary to allow "exact" matches
|
|
|
+ // without knowing the precise region names.
|
|
|
+ Text metaKey = new Text(tableName.toString() + ","
|
|
|
+ + row.toString() + ",999999999999999");
|
|
|
+
|
|
|
+ int tries = 0;
|
|
|
+ while (true) {
|
|
|
+ tries++;
|
|
|
+
|
|
|
+ if (tries >= numRetries) {
|
|
|
+ throw new NoServerForRegionException("Unable to find region for "
|
|
|
+ + row + " after " + numRetries + " tries.");
|
|
|
+ }
|
|
|
+
|
|
|
+ try{
|
|
|
+ // locate the root region
|
|
|
+ HRegionLocation metaLocation = locateRegion(parentTable, metaKey);
|
|
|
+ HRegionInterface server =
|
|
|
+ getHRegionConnection(metaLocation.getServerAddress());
|
|
|
+
|
|
|
+ // query the root region for the location of the meta region
|
|
|
+ HbaseMapWritable regionInfoRow = server.getClosestRowBefore(
|
|
|
+ metaLocation.getRegionInfo().getRegionName(),
|
|
|
+ metaKey, HConstants.LATEST_TIMESTAMP);
|
|
|
+
|
|
|
+ // convert the MapWritable into a Map we can use
|
|
|
+ SortedMap<Text, byte[]> results =
|
|
|
+ sortedMapFromMapWritable(regionInfoRow);
|
|
|
+
|
|
|
+ byte[] bytes = results.get(COL_REGIONINFO);
|
|
|
+
|
|
|
+ if (bytes == null || bytes.length == 0) {
|
|
|
+ throw new IOException("HRegionInfo was null or empty in " +
|
|
|
+ parentTable);
|
|
|
+ }
|
|
|
+
|
|
|
+ // convert the row result into the HRegionLocation we need!
|
|
|
+ HRegionInfo regionInfo = (HRegionInfo) Writables.getWritable(
|
|
|
+ results.get(COL_REGIONINFO), new HRegionInfo());
|
|
|
+
|
|
|
+ if (regionInfo.isOffline()) {
|
|
|
+ throw new IllegalStateException("region offline: " +
|
|
|
+ regionInfo.getRegionName());
|
|
|
+ }
|
|
|
+
|
|
|
+ // possible we got a region of a different table...
|
|
|
+ if (!regionInfo.getTableDesc().getName().equals(tableName)) {
|
|
|
+ throw new TableNotFoundException(
|
|
|
+ "Table '" + tableName + "' was not found.");
|
|
|
+ }
|
|
|
+
|
|
|
+ String serverAddress =
|
|
|
+ Writables.bytesToString(results.get(COL_SERVER));
|
|
|
+
|
|
|
+ if (serverAddress.equals("")) {
|
|
|
+ throw new NoServerForRegionException(
|
|
|
+ "No server address listed in " + parentTable + " for region "
|
|
|
+ + regionInfo.getRegionName());
|
|
|
+ }
|
|
|
+
|
|
|
+ // instantiate the location
|
|
|
+ location = new HRegionLocation(regionInfo,
|
|
|
+ new HServerAddress(serverAddress));
|
|
|
+
|
|
|
+ cacheLocation(tableName, location);
|
|
|
+
|
|
|
+ return location;
|
|
|
+ } catch (IllegalStateException e) {
|
|
|
+ if (tries < numRetries - 1) {
|
|
|
+ if (LOG.isDebugEnabled()) {
|
|
|
+ LOG.debug("reloading table servers because: " + e.getMessage());
|
|
|
+ }
|
|
|
+ relocateRegion(parentTable, metaKey);
|
|
|
+ } else {
|
|
|
+ throw e;
|
|
|
+ }
|
|
|
+ } catch (IOException e) {
|
|
|
+ if (e instanceof RemoteException) {
|
|
|
+ e = RemoteExceptionHandler.decodeRemoteException(
|
|
|
+ (RemoteException) e);
|
|
|
+ }
|
|
|
+ if (tries < numRetries - 1) {
|
|
|
+ if (LOG.isDebugEnabled()) {
|
|
|
+ LOG.debug("reloading table servers because: " + e.getMessage());
|
|
|
+ }
|
|
|
+ relocateRegion(parentTable, metaKey);
|
|
|
+ } else {
|
|
|
+ throw e;
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ try{
|
|
|
+ Thread.sleep(pause);
|
|
|
+ } catch (InterruptedException e){
|
|
|
+ // continue
|
|
|
}
|
|
|
- // We don't know where the table is.
|
|
|
- // Load the information from meta.
|
|
|
- tableServers = findServersForTable(tableName);
|
|
|
}
|
|
|
- SortedMap<Text, HRegionLocation> servers =
|
|
|
- new TreeMap<Text, HRegionLocation>();
|
|
|
- servers.putAll(tableServers);
|
|
|
- return servers;
|
|
|
}
|
|
|
|
|
|
- /** {@inheritDoc} */
|
|
|
- public SortedMap<Text, HRegionLocation>
|
|
|
- reloadTableServers(final Text tableName) throws IOException {
|
|
|
- closedTables.remove(tableName);
|
|
|
- SortedMap<Text, HRegionLocation> tableServers =
|
|
|
- new TreeMap<Text, HRegionLocation>();
|
|
|
- // Reload information for the whole table
|
|
|
- tableServers.putAll(findServersForTable(tableName));
|
|
|
- if (LOG.isDebugEnabled()) {
|
|
|
- StringBuilder sb = new StringBuilder();
|
|
|
- int count = 0;
|
|
|
- for (HRegionLocation location: tableServers.values()) {
|
|
|
- if (sb.length() > 0) {
|
|
|
- sb.append(" ");
|
|
|
+ /**
|
|
|
+ * Search the cache for a location that fits our table and row key.
|
|
|
+ * Return null if no suitable region is located. TODO: synchronization note
|
|
|
+ */
|
|
|
+ private HRegionLocation getCachedLocation(Text tableName, Text row) {
|
|
|
+ // find the map of cached locations for this table
|
|
|
+ SortedMap<Text, HRegionLocation> tableLocations =
|
|
|
+ cachedRegionLocations.get(tableName);
|
|
|
+
|
|
|
+ // if tableLocations for this table isn't built yet, make one
|
|
|
+ if (tableLocations == null) {
|
|
|
+ tableLocations = new TreeMap<Text, HRegionLocation>();
|
|
|
+ cachedRegionLocations.put(tableName, tableLocations);
|
|
|
+ }
|
|
|
+
|
|
|
+ // start to examine the cache. we can only do cache actions
|
|
|
+ // if there's something in the cache for this table.
|
|
|
+ if (!tableLocations.isEmpty()) {
|
|
|
+ if (tableLocations.containsKey(row)) {
|
|
|
+ return tableLocations.get(row);
|
|
|
+ }
|
|
|
+
|
|
|
+ // cut the cache so that we only get the part that could contain
|
|
|
+ // regions that match our key
|
|
|
+ SortedMap<Text, HRegionLocation> matchingRegions =
|
|
|
+ tableLocations.headMap(row);
|
|
|
+
|
|
|
+ // if that portion of the map is empty, then we're done. otherwise,
|
|
|
+ // we need to examine the cached location to verify that it is
|
|
|
+ // a match by end key as well.
|
|
|
+ if (!matchingRegions.isEmpty()) {
|
|
|
+ HRegionLocation possibleRegion =
|
|
|
+ matchingRegions.get(matchingRegions.lastKey());
|
|
|
+
|
|
|
+ Text endKey = possibleRegion.getRegionInfo().getEndKey();
|
|
|
+
|
|
|
+ // make sure that the end key is greater than the row we're looking
|
|
|
+ // for, otherwise the row actually belongs in the next region, not
|
|
|
+ // this one. the exception case is when the endkey is EMPTY_START_ROW,
|
|
|
+ // signifying that the region we're checking is actually the last
|
|
|
+ // region in the table.
|
|
|
+ if (endKey.equals(EMPTY_TEXT) || endKey.compareTo(row) > 0) {
|
|
|
+ return possibleRegion;
|
|
|
}
|
|
|
- sb.append(count++);
|
|
|
- sb.append(". ");
|
|
|
- sb.append("address=");
|
|
|
- sb.append(location.getServerAddress());
|
|
|
- sb.append(", ");
|
|
|
- sb.append(location.getRegionInfo().getRegionName());
|
|
|
}
|
|
|
- LOG.debug("Result of findTable on " + tableName.toString() +
|
|
|
- ": " + sb.toString());
|
|
|
}
|
|
|
|
|
|
- return tableServers;
|
|
|
+ // passed all the way through, so we got nothin - complete cache miss
|
|
|
+ return null;
|
|
|
+ }
|
|
|
+
|
|
|
+
|
|
|
+ /**
|
|
|
+ * Delete a cached location, if it satisfies the table name and row
|
|
|
+ * requirements.
|
|
|
+ */
|
|
|
+ private void deleteCachedLocation(Text tableName, Text row){
|
|
|
+ // find the map of cached locations for this table
|
|
|
+ SortedMap<Text, HRegionLocation> tableLocations =
|
|
|
+ cachedRegionLocations.get(tableName);
|
|
|
+
|
|
|
+ // if tableLocations for this table isn't built yet, make one
|
|
|
+ if (tableLocations == null) {
|
|
|
+ tableLocations = new TreeMap<Text, HRegionLocation>();
|
|
|
+ cachedRegionLocations.put(tableName, tableLocations);
|
|
|
+ }
|
|
|
+
|
|
|
+ // start to examine the cache. we can only do cache actions
|
|
|
+ // if there's something in the cache for this table.
|
|
|
+ if (!tableLocations.isEmpty()) {
|
|
|
+ // cut the cache so that we only get the part that could contain
|
|
|
+ // regions that match our key
|
|
|
+ SortedMap<Text, HRegionLocation> matchingRegions =
|
|
|
+ tableLocations.headMap(row);
|
|
|
+
|
|
|
+ // if that portion of the map is empty, then we're done. otherwise,
|
|
|
+ // we need to examine the cached location to verify that it is
|
|
|
+ // a match by end key as well.
|
|
|
+ if (!matchingRegions.isEmpty()) {
|
|
|
+ HRegionLocation possibleRegion =
|
|
|
+ matchingRegions.get(matchingRegions.lastKey());
|
|
|
+
|
|
|
+ Text endKey = possibleRegion.getRegionInfo().getEndKey();
|
|
|
+
|
|
|
+ // by nature of the map, we know that the start key has to be <
|
|
|
+ // otherwise it wouldn't be in the headMap.
|
|
|
+ if (endKey.compareTo(row) <= 0) {
|
|
|
+ // delete any matching entry
|
|
|
+ tableLocations.remove(matchingRegions.lastKey());
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
}
|
|
|
|
|
|
+
|
|
|
+ /**
|
|
|
+ * Put a newly discovered HRegionLocation into the cache.
|
|
|
+ */
|
|
|
+ private void cacheLocation(Text tableName, HRegionLocation location){
|
|
|
+ Text startKey = location.getRegionInfo().getStartKey();
|
|
|
+
|
|
|
+ // find the map of cached locations for this table
|
|
|
+ SortedMap<Text, HRegionLocation> tableLocations =
|
|
|
+ cachedRegionLocations.get(tableName);
|
|
|
+
|
|
|
+ // if tableLocations for this table isn't built yet, make one
|
|
|
+ if (tableLocations == null) {
|
|
|
+ tableLocations = new TreeMap<Text, HRegionLocation>();
|
|
|
+ cachedRegionLocations.put(tableName, tableLocations);
|
|
|
+ }
|
|
|
+
|
|
|
+ // save the HRegionLocation under the startKey
|
|
|
+ tableLocations.put(startKey, location);
|
|
|
+ }
|
|
|
+
|
|
|
/** {@inheritDoc} */
|
|
|
public HRegionInterface getHRegionConnection(
|
|
|
- HServerAddress regionServer) throws IOException {
|
|
|
+ HServerAddress regionServer)
|
|
|
+ throws IOException {
|
|
|
|
|
|
HRegionInterface server;
|
|
|
synchronized (this.servers) {
|
|
@@ -390,192 +647,58 @@ public class HConnectionManager implements HConstants {
|
|
|
throw new IllegalArgumentException(
|
|
|
"table name cannot be null or zero length");
|
|
|
}
|
|
|
-
|
|
|
+
|
|
|
if (closedTables.contains(tableName)) {
|
|
|
// Table already closed. Ignore it.
|
|
|
return;
|
|
|
}
|
|
|
|
|
|
- SortedMap<Text, HRegionLocation> tableServers =
|
|
|
- tablesToServers.remove(tableName);
|
|
|
-
|
|
|
- if (tableServers == null) {
|
|
|
- // Table not open. Ignore it.
|
|
|
- return;
|
|
|
- }
|
|
|
-
|
|
|
closedTables.add(tableName);
|
|
|
-
|
|
|
- // Shut down connections to the HRegionServers
|
|
|
|
|
|
- synchronized (this.servers) {
|
|
|
- for (HRegionLocation r: tableServers.values()) {
|
|
|
- this.servers.remove(r.getServerAddress().toString());
|
|
|
+ if (cachedRegionLocations.containsKey(tableName)) {
|
|
|
+ SortedMap<Text, HRegionLocation> tableServers =
|
|
|
+ cachedRegionLocations.remove(tableName);
|
|
|
+
|
|
|
+ // Shut down connections to the HRegionServers
|
|
|
+ synchronized (this.servers) {
|
|
|
+ for (HRegionLocation r: tableServers.values()) {
|
|
|
+ this.servers.remove(r.getServerAddress().toString());
|
|
|
+ }
|
|
|
}
|
|
|
}
|
|
|
}
|
|
|
|
|
|
+ /** Convenience method for closing all open tables.*/
|
|
|
void closeAll() {
|
|
|
this.closed = true;
|
|
|
- ArrayList<Text> tables = new ArrayList<Text>(tablesToServers.keySet());
|
|
|
+ ArrayList<Text> tables =
|
|
|
+ new ArrayList<Text>(cachedRegionLocations.keySet());
|
|
|
for (Text tableName: tables) {
|
|
|
close(tableName);
|
|
|
}
|
|
|
}
|
|
|
|
|
|
- /*
|
|
|
- * Clears the cache of all known information about the specified table and
|
|
|
- * locates a table by searching the META or ROOT region (as appropriate) or
|
|
|
- * by querying the master for the location of the root region if that is the
|
|
|
- * table requested.
|
|
|
- *
|
|
|
- * @param tableName - name of table to find servers for
|
|
|
- * @return - map of first row to table info for all regions in the table
|
|
|
- * @throws IOException
|
|
|
- */
|
|
|
- private SortedMap<Text, HRegionLocation> findServersForTable(Text tableName)
|
|
|
- throws IOException {
|
|
|
-
|
|
|
- // Wipe out everything we know about this table
|
|
|
- if (this.tablesToServers.remove(tableName) != null) {
|
|
|
- if (LOG.isDebugEnabled()) {
|
|
|
- LOG.debug("Wiping out all we know of " + tableName);
|
|
|
- }
|
|
|
- }
|
|
|
-
|
|
|
- SortedMap<Text, HRegionLocation> srvrs =
|
|
|
- new TreeMap<Text, HRegionLocation>();
|
|
|
-
|
|
|
- if (tableName.equals(ROOT_TABLE_NAME)) {
|
|
|
- synchronized (rootRegionLock) {
|
|
|
- // This block guards against two threads trying to find the root
|
|
|
- // region at the same time. One will go do the find while the
|
|
|
- // second waits. The second thread will not do find.
|
|
|
-
|
|
|
- srvrs = this.tablesToServers.get(ROOT_TABLE_NAME);
|
|
|
-
|
|
|
- if (srvrs == null) {
|
|
|
- srvrs = locateRootRegion();
|
|
|
- }
|
|
|
- this.tablesToServers.put(tableName, srvrs);
|
|
|
- }
|
|
|
-
|
|
|
- } else if (tableName.equals(META_TABLE_NAME)) {
|
|
|
- synchronized (metaRegionLock) {
|
|
|
- // This block guards against two threads trying to load the meta
|
|
|
- // region at the same time. The first will load the meta region and
|
|
|
- // the second will use the value that the first one found.
|
|
|
-
|
|
|
- SortedMap<Text, HRegionLocation> rootServers =
|
|
|
- tablesToServers.get(ROOT_TABLE_NAME);
|
|
|
-
|
|
|
- for (boolean refindRoot = true; refindRoot; ) {
|
|
|
- if (rootServers == null || rootServers.size() == 0) {
|
|
|
- // (re)find the root region
|
|
|
- rootServers = findServersForTable(ROOT_TABLE_NAME);
|
|
|
- // but don't try again
|
|
|
- refindRoot = false;
|
|
|
- }
|
|
|
- try {
|
|
|
- srvrs = getTableServers(rootServers, META_TABLE_NAME);
|
|
|
- break;
|
|
|
-
|
|
|
- } catch (NotServingRegionException e) {
|
|
|
- if (!refindRoot) {
|
|
|
- // Already found root once. Give up.
|
|
|
- throw e;
|
|
|
- }
|
|
|
- // The root region must have moved - refind it
|
|
|
- rootServers.clear();
|
|
|
- }
|
|
|
- }
|
|
|
- this.tablesToServers.put(tableName, srvrs);
|
|
|
- }
|
|
|
- } else {
|
|
|
- boolean waited = false;
|
|
|
- synchronized (this.tablesBeingLocated) {
|
|
|
- // This block ensures that only one thread will actually try to
|
|
|
- // find a table. If a second thread comes along it will wait
|
|
|
- // until the first thread finishes finding the table.
|
|
|
-
|
|
|
- while (this.tablesBeingLocated.contains(tableName)) {
|
|
|
- waited = true;
|
|
|
- try {
|
|
|
- this.tablesBeingLocated.wait(threadWakeFrequency);
|
|
|
- } catch (InterruptedException e) {
|
|
|
- // continue
|
|
|
- }
|
|
|
- }
|
|
|
- if (!waited) {
|
|
|
- this.tablesBeingLocated.add(tableName);
|
|
|
-
|
|
|
- } else {
|
|
|
- SortedMap<Text, HRegionLocation> tableServers =
|
|
|
- this.tablesToServers.get(tableName);
|
|
|
-
|
|
|
- if (tableServers == null) {
|
|
|
- throw new TableNotFoundException("table not found: " + tableName);
|
|
|
- }
|
|
|
- srvrs.putAll(tableServers);
|
|
|
- }
|
|
|
- }
|
|
|
- if (!waited) {
|
|
|
- try {
|
|
|
- SortedMap<Text, HRegionLocation> metaServers =
|
|
|
- this.tablesToServers.get(META_TABLE_NAME);
|
|
|
-
|
|
|
- for (boolean refindMeta = true; refindMeta; ) {
|
|
|
- if (metaServers == null || metaServers.size() == 0) {
|
|
|
- // (re)find the meta table
|
|
|
- metaServers = findServersForTable(META_TABLE_NAME);
|
|
|
- // but don't try again
|
|
|
- refindMeta = false;
|
|
|
- }
|
|
|
- try {
|
|
|
- srvrs = getTableServers(metaServers, tableName);
|
|
|
- break;
|
|
|
-
|
|
|
- } catch (NotServingRegionException e) {
|
|
|
- if (!refindMeta) {
|
|
|
- // Already refound meta once. Give up.
|
|
|
- throw e;
|
|
|
- }
|
|
|
- // The meta table must have moved - refind it
|
|
|
- metaServers.clear();
|
|
|
- }
|
|
|
- }
|
|
|
- this.tablesToServers.put(tableName, srvrs);
|
|
|
- } finally {
|
|
|
- synchronized (this.tablesBeingLocated) {
|
|
|
- // Wake up the threads waiting for us to find the table
|
|
|
- this.tablesBeingLocated.remove(tableName);
|
|
|
- this.tablesBeingLocated.notifyAll();
|
|
|
- }
|
|
|
- }
|
|
|
- }
|
|
|
- }
|
|
|
- this.tablesToServers.put(tableName, srvrs);
|
|
|
- return srvrs;
|
|
|
- }
|
|
|
-
|
|
|
/*
|
|
|
* Repeatedly try to find the root region by asking the master for where it is
|
|
|
- * @return TreeMap<Text, TableInfo> for root regin if found
|
|
|
+ * @return HRegionLocation for root region if found
|
|
|
* @throws NoServerForRegionException - if the root region can not be located
|
|
|
* after retrying
|
|
|
* @throws IOException
|
|
|
*/
|
|
|
- private TreeMap<Text, HRegionLocation> locateRootRegion()
|
|
|
+ private HRegionLocation locateRootRegion()
|
|
|
throws IOException {
|
|
|
|
|
|
getMaster();
|
|
|
|
|
|
- HServerAddress rootRegionLocation = null;
|
|
|
+ HServerAddress rootRegionAddress = null;
|
|
|
+
|
|
|
for (int tries = 0; tries < numRetries; tries++) {
|
|
|
int localTimeouts = 0;
|
|
|
- while (rootRegionLocation == null && localTimeouts < numRetries) {
|
|
|
- rootRegionLocation = master.findRootRegion();
|
|
|
- if (rootRegionLocation == null) {
|
|
|
+
|
|
|
+ // ask the master which server has the root region
|
|
|
+ while (rootRegionAddress == null && localTimeouts < numRetries) {
|
|
|
+ rootRegionAddress = master.findRootRegion();
|
|
|
+ if (rootRegionAddress == null) {
|
|
|
try {
|
|
|
if (LOG.isDebugEnabled()) {
|
|
|
LOG.debug("Sleeping. Waiting for root region.");
|
|
@@ -591,15 +714,18 @@ public class HConnectionManager implements HConstants {
|
|
|
}
|
|
|
}
|
|
|
|
|
|
- if (rootRegionLocation == null) {
|
|
|
+ if (rootRegionAddress == null) {
|
|
|
throw new NoServerForRegionException(
|
|
|
"Timed out trying to locate root region");
|
|
|
}
|
|
|
|
|
|
- HRegionInterface rootRegion = getHRegionConnection(rootRegionLocation);
|
|
|
+ // get a connection to the region server
|
|
|
+ HRegionInterface server = getHRegionConnection(rootRegionAddress);
|
|
|
|
|
|
try {
|
|
|
- rootRegion.getRegionInfo(HRegionInfo.rootRegionInfo.getRegionName());
|
|
|
+ // if this works, then we're good, and we have an acceptable address,
|
|
|
+ // so we can stop doing retries and return the result.
|
|
|
+ server.getRegionInfo(HRegionInfo.rootRegionInfo.getRegionName());
|
|
|
break;
|
|
|
} catch (IOException e) {
|
|
|
if (tries == numRetries - 1) {
|
|
@@ -624,183 +750,20 @@ public class HConnectionManager implements HConstants {
|
|
|
// continue
|
|
|
}
|
|
|
}
|
|
|
- rootRegionLocation = null;
|
|
|
+
|
|
|
+ rootRegionAddress = null;
|
|
|
}
|
|
|
|
|
|
- if (rootRegionLocation == null) {
|
|
|
+ // if the adress is null by this point, then the retries have failed,
|
|
|
+ // and we're sort of sunk
|
|
|
+ if (rootRegionAddress == null) {
|
|
|
throw new NoServerForRegionException(
|
|
|
"unable to locate root region server");
|
|
|
}
|
|
|
|
|
|
- TreeMap<Text, HRegionLocation> rootServer =
|
|
|
- new TreeMap<Text, HRegionLocation>();
|
|
|
-
|
|
|
- rootServer.put(EMPTY_START_ROW,
|
|
|
- new HRegionLocation(HRegionInfo.rootRegionInfo, rootRegionLocation));
|
|
|
-
|
|
|
- return rootServer;
|
|
|
- }
|
|
|
-
|
|
|
- /*
|
|
|
- * @param metaServers the meta servers that would know where the table is
|
|
|
- * @param tableName name of the table
|
|
|
- * @return map of region start key -> server location
|
|
|
- * @throws IOException
|
|
|
- */
|
|
|
- private SortedMap<Text, HRegionLocation> getTableServers(
|
|
|
- final SortedMap<Text, HRegionLocation> metaServers,
|
|
|
- final Text tableName) throws IOException {
|
|
|
-
|
|
|
- // If there is more than one meta server, find the first one that should
|
|
|
- // know about the table we are looking for, and reduce the number of
|
|
|
- // servers we need to query.
|
|
|
-
|
|
|
- SortedMap<Text, HRegionLocation> metaServersForTable = metaServers;
|
|
|
- if (metaServersForTable.size() > 1) {
|
|
|
- Text firstMetaRegion = metaServersForTable.headMap(tableName).lastKey();
|
|
|
- metaServersForTable = metaServersForTable.tailMap(firstMetaRegion);
|
|
|
- }
|
|
|
-
|
|
|
- SortedMap<Text, HRegionLocation> tableServers =
|
|
|
- new TreeMap<Text, HRegionLocation>();
|
|
|
-
|
|
|
- int tries = 0;
|
|
|
- do {
|
|
|
- if (tries >= numRetries - 1) {
|
|
|
- throw new NoServerForRegionException(
|
|
|
- "failed to find server for " + tableName + " after "
|
|
|
- + numRetries + " retries");
|
|
|
-
|
|
|
- } else if (tries > 0) {
|
|
|
- if (LOG.isDebugEnabled()) {
|
|
|
- LOG.debug("Sleeping. Table " + tableName +
|
|
|
- " not currently being served.");
|
|
|
- }
|
|
|
- try {
|
|
|
- Thread.sleep(pause);
|
|
|
- } catch (InterruptedException ie) {
|
|
|
- // continue
|
|
|
- }
|
|
|
- if (LOG.isDebugEnabled()) {
|
|
|
- LOG.debug("Wake. Retry finding table " + tableName);
|
|
|
- }
|
|
|
- }
|
|
|
- for (HRegionLocation t: metaServersForTable.values()) {
|
|
|
- tableServers.putAll(scanOneMetaRegion(t, tableName));
|
|
|
- }
|
|
|
- tries += 1;
|
|
|
- } while (tableServers.size() == 0);
|
|
|
- return tableServers;
|
|
|
- }
|
|
|
-
|
|
|
- /*
|
|
|
- * Scans a single meta region
|
|
|
- * @param t the meta region we're going to scan
|
|
|
- * @param tableName the name of the table we're looking for
|
|
|
- * @return returns a map of startingRow to TableInfo
|
|
|
- * @throws TableNotFoundException - if table does not exist
|
|
|
- * @throws IllegalStateException - if table is offline
|
|
|
- * @throws IOException
|
|
|
- */
|
|
|
- private SortedMap<Text, HRegionLocation> scanOneMetaRegion(
|
|
|
- final HRegionLocation t, final Text tableName) throws IOException {
|
|
|
-
|
|
|
- HRegionInterface server = getHRegionConnection(t.getServerAddress());
|
|
|
- TreeMap<Text, HRegionLocation> servers =
|
|
|
- new TreeMap<Text, HRegionLocation>();
|
|
|
-
|
|
|
- long scannerId = -1L;
|
|
|
- try {
|
|
|
- scannerId = server.openScanner(t.getRegionInfo().getRegionName(),
|
|
|
- COLUMN_FAMILY_ARRAY, tableName, System.currentTimeMillis(), null);
|
|
|
-
|
|
|
- while (true) {
|
|
|
- HbaseMapWritable values = server.next(scannerId);
|
|
|
- if (values == null || values.size() == 0) {
|
|
|
- if (servers.size() == 0) {
|
|
|
- // If we didn't find any servers then the table does not exist
|
|
|
- throw new TableNotFoundException("table '" + tableName +
|
|
|
- "' does not exist in " + t);
|
|
|
- }
|
|
|
-
|
|
|
- // We found at least one server for the table and now we're done.
|
|
|
- if (LOG.isDebugEnabled()) {
|
|
|
- LOG.debug("Found " + servers.size() + " region(s) for " +
|
|
|
- tableName + " at " + t);
|
|
|
- }
|
|
|
- break;
|
|
|
- }
|
|
|
-
|
|
|
- SortedMap<Text, byte[]> results = new TreeMap<Text, byte[]>();
|
|
|
- for (Map.Entry<Writable, Writable> e: values.entrySet()) {
|
|
|
- HStoreKey key = (HStoreKey) e.getKey();
|
|
|
- results.put(key.getColumn(),
|
|
|
- ((ImmutableBytesWritable) e.getValue()).get());
|
|
|
- }
|
|
|
-
|
|
|
- byte[] bytes = results.get(COL_REGIONINFO);
|
|
|
- if (bytes == null || bytes.length == 0) {
|
|
|
- // This can be null. Looks like an info:splitA or info:splitB
|
|
|
- // is only item in the row.
|
|
|
- if (LOG.isDebugEnabled()) {
|
|
|
- LOG.debug(COL_REGIONINFO.toString() + " came back empty: " +
|
|
|
- results.toString());
|
|
|
- }
|
|
|
- servers.clear();
|
|
|
- break;
|
|
|
- }
|
|
|
-
|
|
|
- HRegionInfo regionInfo = (HRegionInfo) Writables.getWritable(
|
|
|
- results.get(COL_REGIONINFO), new HRegionInfo());
|
|
|
-
|
|
|
- if (!regionInfo.getTableDesc().getName().equals(tableName)) {
|
|
|
- // We're done
|
|
|
- if (LOG.isDebugEnabled()) {
|
|
|
- LOG.debug("Found " + servers.size() + " servers for table " +
|
|
|
- tableName);
|
|
|
- }
|
|
|
- break;
|
|
|
- }
|
|
|
-
|
|
|
- if (regionInfo.isSplit()) {
|
|
|
- // Region is a split parent. Skip it.
|
|
|
- continue;
|
|
|
- }
|
|
|
-
|
|
|
- if (regionInfo.isOffline()) {
|
|
|
- throw new IllegalStateException("table offline: " + tableName);
|
|
|
- }
|
|
|
-
|
|
|
- bytes = results.get(COL_SERVER);
|
|
|
- if (bytes == null || bytes.length == 0) {
|
|
|
- // We need to rescan because the table we want is unassigned.
|
|
|
- if (LOG.isDebugEnabled()) {
|
|
|
- LOG.debug("no server address for " + regionInfo.toString());
|
|
|
- }
|
|
|
- servers.clear();
|
|
|
- break;
|
|
|
- }
|
|
|
-
|
|
|
- String serverAddress = Writables.bytesToString(bytes);
|
|
|
- servers.put(regionInfo.getStartKey(), new HRegionLocation(
|
|
|
- regionInfo, new HServerAddress(serverAddress)));
|
|
|
- }
|
|
|
- } catch (IOException e) {
|
|
|
- if (e instanceof RemoteException) {
|
|
|
- e = RemoteExceptionHandler.decodeRemoteException((RemoteException) e);
|
|
|
- }
|
|
|
- throw e;
|
|
|
-
|
|
|
- } finally {
|
|
|
- if (scannerId != -1L) {
|
|
|
- try {
|
|
|
- server.close(scannerId);
|
|
|
- } catch (Exception ex) {
|
|
|
- LOG.warn(ex);
|
|
|
- }
|
|
|
- }
|
|
|
- }
|
|
|
- return servers;
|
|
|
+ // return the region location
|
|
|
+ return new HRegionLocation(
|
|
|
+ HRegionInfo.rootRegionInfo, rootRegionAddress);
|
|
|
}
|
|
|
}
|
|
|
}
|