|
@@ -1606,99 +1606,104 @@ public class HMaster extends Thread implements HConstants, HMasterInterface,
|
|
|
private void assignRegions(HServerInfo info, String serverName,
|
|
|
ArrayList<HMsg> returnMsgs) {
|
|
|
|
|
|
- long now = System.currentTimeMillis();
|
|
|
- Set<Text> regionsToAssign = new HashSet<Text>();
|
|
|
synchronized (this.assignAttempts) {
|
|
|
+
|
|
|
+ // We need to hold a lock on assign attempts while we figure out what to
|
|
|
+ // do so that multiple threads do not execute this method in parallel
|
|
|
+ // resulting in assigning the same region to multiple servers.
|
|
|
+
|
|
|
+ long now = System.currentTimeMillis();
|
|
|
+ Set<Text> regionsToAssign = new HashSet<Text>();
|
|
|
for (Map.Entry<Text, Long> e: this.assignAttempts.entrySet()) {
|
|
|
long diff = now - e.getValue().longValue();
|
|
|
if (diff > this.maxRegionOpenTime) {
|
|
|
regionsToAssign.add(e.getKey());
|
|
|
}
|
|
|
}
|
|
|
- }
|
|
|
- int nRegionsToAssign = regionsToAssign.size();
|
|
|
- if (nRegionsToAssign <= 0) {
|
|
|
- // No regions to assign. Return.
|
|
|
- return;
|
|
|
- }
|
|
|
+ int nRegionsToAssign = regionsToAssign.size();
|
|
|
+ if (nRegionsToAssign <= 0) {
|
|
|
+ // No regions to assign. Return.
|
|
|
+ return;
|
|
|
+ }
|
|
|
|
|
|
- if (this.serversToServerInfo.size() == 1) {
|
|
|
- assignRegionsToOneServer(regionsToAssign, serverName, returnMsgs);
|
|
|
- // Finished. Return.
|
|
|
- return;
|
|
|
- }
|
|
|
+ if (this.serversToServerInfo.size() == 1) {
|
|
|
+ assignRegionsToOneServer(regionsToAssign, serverName, returnMsgs);
|
|
|
+ // Finished. Return.
|
|
|
+ return;
|
|
|
+ }
|
|
|
|
|
|
- // Multiple servers in play.
|
|
|
- // We need to allocate regions only to most lightly loaded servers.
|
|
|
- HServerLoad thisServersLoad = info.getLoad();
|
|
|
- int nregions = regionsPerServer(nRegionsToAssign, thisServersLoad);
|
|
|
- nRegionsToAssign -= nregions;
|
|
|
- if (nRegionsToAssign > 0) {
|
|
|
- // We still have more regions to assign. See how many we can assign
|
|
|
- // before this server becomes more heavily loaded than the next
|
|
|
- // most heavily loaded server.
|
|
|
- SortedMap<HServerLoad, Set<String>> heavyServers =
|
|
|
- new TreeMap<HServerLoad, Set<String>>();
|
|
|
- synchronized (this.loadToServers) {
|
|
|
- heavyServers.putAll(this.loadToServers.tailMap(thisServersLoad));
|
|
|
- }
|
|
|
- int nservers = 0;
|
|
|
- HServerLoad heavierLoad = null;
|
|
|
- for (Map.Entry<HServerLoad, Set<String>> e : heavyServers.entrySet()) {
|
|
|
- Set<String> servers = e.getValue();
|
|
|
- nservers += servers.size();
|
|
|
- if (e.getKey().compareTo(thisServersLoad) == 0) {
|
|
|
- // This is the load factor of the server we are considering
|
|
|
- nservers -= 1;
|
|
|
- continue;
|
|
|
- }
|
|
|
+ // Multiple servers in play.
|
|
|
+ // We need to allocate regions only to most lightly loaded servers.
|
|
|
+ HServerLoad thisServersLoad = info.getLoad();
|
|
|
+ int nregions = regionsPerServer(nRegionsToAssign, thisServersLoad);
|
|
|
+ nRegionsToAssign -= nregions;
|
|
|
+ if (nRegionsToAssign > 0) {
|
|
|
+ // We still have more regions to assign. See how many we can assign
|
|
|
+ // before this server becomes more heavily loaded than the next
|
|
|
+ // most heavily loaded server.
|
|
|
+ SortedMap<HServerLoad, Set<String>> heavyServers =
|
|
|
+ new TreeMap<HServerLoad, Set<String>>();
|
|
|
+ synchronized (this.loadToServers) {
|
|
|
+ heavyServers.putAll(this.loadToServers.tailMap(thisServersLoad));
|
|
|
+ }
|
|
|
+ int nservers = 0;
|
|
|
+ HServerLoad heavierLoad = null;
|
|
|
+ for (Map.Entry<HServerLoad, Set<String>> e : heavyServers.entrySet()) {
|
|
|
+ Set<String> servers = e.getValue();
|
|
|
+ nservers += servers.size();
|
|
|
+ if (e.getKey().compareTo(thisServersLoad) == 0) {
|
|
|
+ // This is the load factor of the server we are considering
|
|
|
+ nservers -= 1;
|
|
|
+ continue;
|
|
|
+ }
|
|
|
|
|
|
- // If we get here, we are at the first load entry that is a
|
|
|
- // heavier load than the server we are considering
|
|
|
- heavierLoad = e.getKey();
|
|
|
- break;
|
|
|
- }
|
|
|
+ // If we get here, we are at the first load entry that is a
|
|
|
+ // heavier load than the server we are considering
|
|
|
+ heavierLoad = e.getKey();
|
|
|
+ break;
|
|
|
+ }
|
|
|
|
|
|
- nregions = 0;
|
|
|
- if (heavierLoad != null) {
|
|
|
- // There is a more heavily loaded server
|
|
|
- for (HServerLoad load =
|
|
|
- new HServerLoad(thisServersLoad.getNumberOfRequests(),
|
|
|
- thisServersLoad.getNumberOfRegions());
|
|
|
- load.compareTo(heavierLoad) <= 0 && nregions < nRegionsToAssign;
|
|
|
- load.setNumberOfRegions(load.getNumberOfRegions() + 1), nregions++) {
|
|
|
- // continue;
|
|
|
+ nregions = 0;
|
|
|
+ if (heavierLoad != null) {
|
|
|
+ // There is a more heavily loaded server
|
|
|
+ for (HServerLoad load =
|
|
|
+ new HServerLoad(thisServersLoad.getNumberOfRequests(),
|
|
|
+ thisServersLoad.getNumberOfRegions());
|
|
|
+ load.compareTo(heavierLoad) <= 0 && nregions < nRegionsToAssign;
|
|
|
+ load.setNumberOfRegions(load.getNumberOfRegions() + 1), nregions++) {
|
|
|
+ // continue;
|
|
|
+ }
|
|
|
}
|
|
|
- }
|
|
|
|
|
|
- if (nregions < nRegionsToAssign) {
|
|
|
- // There are some more heavily loaded servers
|
|
|
- // but we can't assign all the regions to this server.
|
|
|
- if (nservers > 0) {
|
|
|
- // There are other servers that can share the load.
|
|
|
- // Split regions that need assignment across the servers.
|
|
|
- nregions = (int) Math.ceil((1.0 * nRegionsToAssign)
|
|
|
- / (1.0 * nservers));
|
|
|
+ if (nregions < nRegionsToAssign) {
|
|
|
+ // There are some more heavily loaded servers
|
|
|
+ // but we can't assign all the regions to this server.
|
|
|
+ if (nservers > 0) {
|
|
|
+ // There are other servers that can share the load.
|
|
|
+ // Split regions that need assignment across the servers.
|
|
|
+ nregions = (int) Math.ceil((1.0 * nRegionsToAssign)
|
|
|
+ / (1.0 * nservers));
|
|
|
+ } else {
|
|
|
+ // No other servers with same load.
|
|
|
+ // Split regions over all available servers
|
|
|
+ nregions = (int) Math.ceil((1.0 * nRegionsToAssign)
|
|
|
+ / (1.0 * serversToServerInfo.size()));
|
|
|
+ }
|
|
|
} else {
|
|
|
- // No other servers with same load.
|
|
|
- // Split regions over all available servers
|
|
|
- nregions = (int) Math.ceil((1.0 * nRegionsToAssign)
|
|
|
- / (1.0 * serversToServerInfo.size()));
|
|
|
- }
|
|
|
- } else {
|
|
|
- // Assign all regions to this server
|
|
|
- nregions = nRegionsToAssign;
|
|
|
- }
|
|
|
-
|
|
|
- now = System.currentTimeMillis();
|
|
|
- for (Text regionName: regionsToAssign) {
|
|
|
- HRegionInfo regionInfo = this.unassignedRegions.get(regionName);
|
|
|
- LOG.info("assigning region " + regionName + " to server " +
|
|
|
- serverName);
|
|
|
- this.assignAttempts.put(regionName, Long.valueOf(now));
|
|
|
- returnMsgs.add(new HMsg(HMsg.MSG_REGION_OPEN, regionInfo));
|
|
|
- if (--nregions <= 0) {
|
|
|
- break;
|
|
|
+ // Assign all regions to this server
|
|
|
+ nregions = nRegionsToAssign;
|
|
|
+ }
|
|
|
+
|
|
|
+ now = System.currentTimeMillis();
|
|
|
+ for (Text regionName: regionsToAssign) {
|
|
|
+ HRegionInfo regionInfo = this.unassignedRegions.get(regionName);
|
|
|
+ LOG.info("assigning region " + regionName + " to server " +
|
|
|
+ serverName);
|
|
|
+ this.assignAttempts.put(regionName, Long.valueOf(now));
|
|
|
+ returnMsgs.add(new HMsg(HMsg.MSG_REGION_OPEN, regionInfo));
|
|
|
+ if (--nregions <= 0) {
|
|
|
+ break;
|
|
|
+ }
|
|
|
}
|
|
|
}
|
|
|
}
|
|
@@ -2092,14 +2097,18 @@ public class HMaster extends Thread implements HConstants, HMasterInterface,
|
|
|
if (numberOfMetaRegions.get() != onlineMetaRegions.size()) {
|
|
|
// We can't proceed because not all of the meta regions are online.
|
|
|
// We can't block either because that would prevent the meta region
|
|
|
- // online message from being processed. So return false to have this
|
|
|
- // operation requeued.
|
|
|
-
|
|
|
+ // online message from being processed. In order to prevent spinning
|
|
|
+ // in the run queue, put this request on the delay queue to give
|
|
|
+ // other threads the opportunity to get the meta regions on-line.
|
|
|
if (LOG.isDebugEnabled()) {
|
|
|
LOG.debug(
|
|
|
"Requeuing shutdown because not all meta regions are online");
|
|
|
}
|
|
|
- return false;
|
|
|
+ this.expire = System.currentTimeMillis() + leaseTimeout / 2;
|
|
|
+ shutdownQueue.put(this);
|
|
|
+
|
|
|
+ // Return true so run() does not put us back on the msgQueue
|
|
|
+ return true;
|
|
|
}
|
|
|
for (int tries = 0; tries < numRetries; tries++) {
|
|
|
try {
|