|
@@ -1372,7 +1372,7 @@ public class BlockManager {
|
|
|
|
|
|
if (numEffectiveReplicas >= requiredReplication) {
|
|
|
if ( (pendingReplications.getNumReplicas(block) > 0) ||
|
|
|
- (blockHasEnoughRacks(block)) ) {
|
|
|
+ (isPlacementPolicySatisfied(block)) ) {
|
|
|
neededReplications.remove(block, priority); // remove from neededReplications
|
|
|
neededReplications.decrementReplicationIndex(priority);
|
|
|
blockLog.info("BLOCK* Removing " + block
|
|
@@ -1443,7 +1443,7 @@ public class BlockManager {
|
|
|
|
|
|
if (numEffectiveReplicas >= requiredReplication) {
|
|
|
if ( (pendingReplications.getNumReplicas(block) > 0) ||
|
|
|
- (blockHasEnoughRacks(block)) ) {
|
|
|
+ (isPlacementPolicySatisfied(block)) ) {
|
|
|
neededReplications.remove(block, priority); // remove from neededReplications
|
|
|
neededReplications.decrementReplicationIndex(priority);
|
|
|
rw.targets = null;
|
|
@@ -1454,7 +1454,7 @@ public class BlockManager {
|
|
|
}
|
|
|
|
|
|
if ( (numReplicas.liveReplicas() >= requiredReplication) &&
|
|
|
- (!blockHasEnoughRacks(block)) ) {
|
|
|
+ (!isPlacementPolicySatisfied(block)) ) {
|
|
|
if (rw.srcNode.getNetworkLocation().equals(
|
|
|
targets[0].getDatanodeDescriptor().getNetworkLocation())) {
|
|
|
//No use continuing, unless a new rack in this case
|
|
@@ -2873,109 +2873,48 @@ public class BlockManager {
|
|
|
}
|
|
|
}
|
|
|
chooseExcessReplicates(nonExcess, block, replication,
|
|
|
- addedNode, delNodeHint, blockplacement);
|
|
|
+ addedNode, delNodeHint);
|
|
|
}
|
|
|
|
|
|
-
|
|
|
- /**
|
|
|
- * We want "replication" replicates for the block, but we now have too many.
|
|
|
- * In this method, copy enough nodes from 'srcNodes' into 'dstNodes' such that:
|
|
|
- *
|
|
|
- * srcNodes.size() - dstNodes.size() == replication
|
|
|
- *
|
|
|
- * We pick node that make sure that replicas are spread across racks and
|
|
|
- * also try hard to pick one with least free space.
|
|
|
- * The algorithm is first to pick a node with least free space from nodes
|
|
|
- * that are on a rack holding more than one replicas of the block.
|
|
|
- * So removing such a replica won't remove a rack.
|
|
|
- * If no such a node is available,
|
|
|
- * then pick a node with least free space
|
|
|
- */
|
|
|
- private void chooseExcessReplicates(final Collection<DatanodeStorageInfo> nonExcess,
|
|
|
- Block b, short replication,
|
|
|
- DatanodeDescriptor addedNode,
|
|
|
- DatanodeDescriptor delNodeHint,
|
|
|
- BlockPlacementPolicy replicator) {
|
|
|
+ private void chooseExcessReplicates(
|
|
|
+ final Collection<DatanodeStorageInfo> nonExcess,
|
|
|
+ Block b, short replication,
|
|
|
+ DatanodeDescriptor addedNode,
|
|
|
+ DatanodeDescriptor delNodeHint) {
|
|
|
assert namesystem.hasWriteLock();
|
|
|
// first form a rack to datanodes map and
|
|
|
BlockCollection bc = getBlockCollection(b);
|
|
|
- final BlockStoragePolicy storagePolicy = storagePolicySuite.getPolicy(bc.getStoragePolicyID());
|
|
|
+ final BlockStoragePolicy storagePolicy = storagePolicySuite.getPolicy(
|
|
|
+ bc.getStoragePolicyID());
|
|
|
final List<StorageType> excessTypes = storagePolicy.chooseExcess(
|
|
|
replication, DatanodeStorageInfo.toStorageTypes(nonExcess));
|
|
|
-
|
|
|
-
|
|
|
- final Map<String, List<DatanodeStorageInfo>> rackMap
|
|
|
- = new HashMap<String, List<DatanodeStorageInfo>>();
|
|
|
- final List<DatanodeStorageInfo> moreThanOne = new ArrayList<DatanodeStorageInfo>();
|
|
|
- final List<DatanodeStorageInfo> exactlyOne = new ArrayList<DatanodeStorageInfo>();
|
|
|
-
|
|
|
- // split nodes into two sets
|
|
|
- // moreThanOne contains nodes on rack with more than one replica
|
|
|
- // exactlyOne contains the remaining nodes
|
|
|
- replicator.splitNodesWithRack(nonExcess, rackMap, moreThanOne, exactlyOne);
|
|
|
-
|
|
|
- // pick one node to delete that favors the delete hint
|
|
|
- // otherwise pick one with least space from priSet if it is not empty
|
|
|
- // otherwise one node with least space from remains
|
|
|
- boolean firstOne = true;
|
|
|
- final DatanodeStorageInfo delNodeHintStorage
|
|
|
- = DatanodeStorageInfo.getDatanodeStorageInfo(nonExcess, delNodeHint);
|
|
|
- final DatanodeStorageInfo addedNodeStorage
|
|
|
- = DatanodeStorageInfo.getDatanodeStorageInfo(nonExcess, addedNode);
|
|
|
- while (nonExcess.size() - replication > 0) {
|
|
|
- final DatanodeStorageInfo cur;
|
|
|
- if (useDelHint(firstOne, delNodeHintStorage, addedNodeStorage,
|
|
|
- moreThanOne, excessTypes)) {
|
|
|
- cur = delNodeHintStorage;
|
|
|
- } else { // regular excessive replica removal
|
|
|
- cur = replicator.chooseReplicaToDelete(bc, b, replication,
|
|
|
- moreThanOne, exactlyOne, excessTypes);
|
|
|
- }
|
|
|
- firstOne = false;
|
|
|
-
|
|
|
- // adjust rackmap, moreThanOne, and exactlyOne
|
|
|
- replicator.adjustSetsWithChosenReplica(rackMap, moreThanOne,
|
|
|
- exactlyOne, cur);
|
|
|
-
|
|
|
- nonExcess.remove(cur);
|
|
|
- addToExcessReplicate(cur.getDatanodeDescriptor(), b);
|
|
|
-
|
|
|
- //
|
|
|
- // The 'excessblocks' tracks blocks until we get confirmation
|
|
|
- // that the datanode has deleted them; the only way we remove them
|
|
|
- // is when we get a "removeBlock" message.
|
|
|
- //
|
|
|
- // The 'invalidate' list is used to inform the datanode the block
|
|
|
- // should be deleted. Items are removed from the invalidate list
|
|
|
- // upon giving instructions to the namenode.
|
|
|
- //
|
|
|
- addToInvalidates(b, cur.getDatanodeDescriptor());
|
|
|
- blockLog.info("BLOCK* chooseExcessReplicates: "
|
|
|
- +"("+cur+", "+b+") is added to invalidated blocks set");
|
|
|
- }
|
|
|
- }
|
|
|
-
|
|
|
- /** Check if we can use delHint */
|
|
|
- static boolean useDelHint(boolean isFirst, DatanodeStorageInfo delHint,
|
|
|
- DatanodeStorageInfo added, List<DatanodeStorageInfo> moreThan1Racks,
|
|
|
- List<StorageType> excessTypes) {
|
|
|
- if (!isFirst) {
|
|
|
- return false; // only consider delHint for the first case
|
|
|
- } else if (delHint == null) {
|
|
|
- return false; // no delHint
|
|
|
- } else if (!excessTypes.contains(delHint.getStorageType())) {
|
|
|
- return false; // delHint storage type is not an excess type
|
|
|
- } else {
|
|
|
- // check if removing delHint reduces the number of racks
|
|
|
- if (moreThan1Racks.contains(delHint)) {
|
|
|
- return true; // delHint and some other nodes are under the same rack
|
|
|
- } else if (added != null && !moreThan1Racks.contains(added)) {
|
|
|
- return true; // the added node adds a new rack
|
|
|
- }
|
|
|
- return false; // removing delHint reduces the number of racks;
|
|
|
+ List<DatanodeStorageInfo> replicasToDelete = blockplacement
|
|
|
+ .chooseReplicasToDelete(nonExcess, replication, excessTypes,
|
|
|
+ addedNode, delNodeHint);
|
|
|
+ for (DatanodeStorageInfo choosenReplica : replicasToDelete) {
|
|
|
+ processChosenExcessReplica(nonExcess, choosenReplica, b);
|
|
|
}
|
|
|
}
|
|
|
|
|
|
+ private void processChosenExcessReplica(
|
|
|
+ final Collection<DatanodeStorageInfo> nonExcess,
|
|
|
+ final DatanodeStorageInfo chosen, Block b) {
|
|
|
+ nonExcess.remove(chosen);
|
|
|
+ addToExcessReplicate(chosen.getDatanodeDescriptor(), b);
|
|
|
+ //
|
|
|
+ // The 'excessblocks' tracks blocks until we get confirmation
|
|
|
+ // that the datanode has deleted them; the only way we remove them
|
|
|
+ // is when we get a "removeBlock" message.
|
|
|
+ //
|
|
|
+ // The 'invalidate' list is used to inform the datanode the block
|
|
|
+ // should be deleted. Items are removed from the invalidate list
|
|
|
+ // upon giving instructions to the datanodes.
|
|
|
+ //
|
|
|
+ addToInvalidates(b, chosen.getDatanodeDescriptor());
|
|
|
+ blockLog.debug("BLOCK* chooseExcessReplicates: "
|
|
|
+ +"("+chosen+", "+b+") is added to invalidated blocks set");
|
|
|
+ }
|
|
|
+
|
|
|
private void addToExcessReplicate(DatanodeInfo dn, Block block) {
|
|
|
assert namesystem.hasWriteLock();
|
|
|
LightWeightLinkedSet<Block> excessBlocks = excessReplicateMap.get(dn.getDatanodeUuid());
|
|
@@ -3544,33 +3483,20 @@ public class BlockManager {
|
|
|
return toInvalidate.size();
|
|
|
}
|
|
|
|
|
|
- boolean blockHasEnoughRacks(Block b) {
|
|
|
- boolean enoughRacks = false;;
|
|
|
- Collection<DatanodeDescriptor> corruptNodes =
|
|
|
- corruptReplicas.getNodes(b);
|
|
|
- int numExpectedReplicas = getReplication(b);
|
|
|
- String rackName = null;
|
|
|
- for(DatanodeStorageInfo storage : blocksMap.getStorages(b)) {
|
|
|
+ boolean isPlacementPolicySatisfied(Block b) {
|
|
|
+ List<DatanodeDescriptor> liveNodes = new ArrayList<DatanodeDescriptor>();
|
|
|
+ Collection<DatanodeDescriptor> corruptNodes = corruptReplicas
|
|
|
+ .getNodes(b);
|
|
|
+ for (DatanodeStorageInfo storage : blocksMap.getStorages(b)) {
|
|
|
final DatanodeDescriptor cur = storage.getDatanodeDescriptor();
|
|
|
- if (!cur.isDecommissionInProgress() && !cur.isDecommissioned()) {
|
|
|
- if ((corruptNodes == null ) || !corruptNodes.contains(cur)) {
|
|
|
- if (numExpectedReplicas == 1 ||
|
|
|
- (numExpectedReplicas > 1 &&
|
|
|
- !datanodeManager.hasClusterEverBeenMultiRack())) {
|
|
|
- enoughRacks = true;
|
|
|
- break;
|
|
|
- }
|
|
|
- String rackNameNew = cur.getNetworkLocation();
|
|
|
- if (rackName == null) {
|
|
|
- rackName = rackNameNew;
|
|
|
- } else if (!rackName.equals(rackNameNew)) {
|
|
|
- enoughRacks = true;
|
|
|
- break;
|
|
|
- }
|
|
|
- }
|
|
|
+ if (!cur.isDecommissionInProgress() && !cur.isDecommissioned()
|
|
|
+ && ((corruptNodes == null) || !corruptNodes.contains(cur))) {
|
|
|
+ liveNodes.add(cur);
|
|
|
}
|
|
|
}
|
|
|
- return enoughRacks;
|
|
|
+ DatanodeInfo[] locs = liveNodes.toArray(new DatanodeInfo[liveNodes.size()]);
|
|
|
+ return blockplacement.verifyBlockPlacement(locs,
|
|
|
+ getReplication(b)).isPlacementPolicySatisfied();
|
|
|
}
|
|
|
|
|
|
/**
|
|
@@ -3578,7 +3504,7 @@ public class BlockManager {
|
|
|
* or if it does not have enough racks.
|
|
|
*/
|
|
|
private boolean isNeededReplication(Block b, int expected, int current) {
|
|
|
- return current < expected || !blockHasEnoughRacks(b);
|
|
|
+ return current < expected || !isPlacementPolicySatisfied(b);
|
|
|
}
|
|
|
|
|
|
public long getMissingBlocksCount() {
|