|
@@ -239,9 +239,6 @@ public class BlockManager implements BlockStatsMXBean {
|
|
|
final float blocksInvalidateWorkPct;
|
|
|
final int blocksReplWorkMultiplier;
|
|
|
|
|
|
- /** variable to enable check for enough racks */
|
|
|
- final boolean shouldCheckForEnoughRacks;
|
|
|
-
|
|
|
// whether or not to issue block encryption keys.
|
|
|
final boolean encryptDataTransfer;
|
|
|
|
|
@@ -339,10 +336,6 @@ public class BlockManager implements BlockStatsMXBean {
|
|
|
conf.getInt(
|
|
|
DFSConfigKeys.DFS_NAMENODE_REPLICATION_STREAMS_HARD_LIMIT_KEY,
|
|
|
DFSConfigKeys.DFS_NAMENODE_REPLICATION_STREAMS_HARD_LIMIT_DEFAULT);
|
|
|
- this.shouldCheckForEnoughRacks =
|
|
|
- conf.get(DFSConfigKeys.NET_TOPOLOGY_SCRIPT_FILE_NAME_KEY) == null
|
|
|
- ? false : true;
|
|
|
-
|
|
|
this.blocksInvalidateWorkPct = DFSUtil.getInvalidateWorkPctPerIteration(conf);
|
|
|
this.blocksReplWorkMultiplier = DFSUtil.getReplWorkMultiplier(conf);
|
|
|
|
|
@@ -366,7 +359,6 @@ public class BlockManager implements BlockStatsMXBean {
|
|
|
LOG.info("maxReplication = " + maxReplication);
|
|
|
LOG.info("minReplication = " + minReplication);
|
|
|
LOG.info("maxReplicationStreams = " + maxReplicationStreams);
|
|
|
- LOG.info("shouldCheckForEnoughRacks = " + shouldCheckForEnoughRacks);
|
|
|
LOG.info("replicationRecheckInterval = " + replicationRecheckInterval);
|
|
|
LOG.info("encryptDataTransfer = " + encryptDataTransfer);
|
|
|
LOG.info("maxNumBlocksToLog = " + maxNumBlocksToLog);
|
|
@@ -1432,7 +1424,7 @@ public class BlockManager implements BlockStatsMXBean {
|
|
|
NumberReplicas numReplicas, int pendingReplicaNum, int required) {
|
|
|
int numEffectiveReplicas = numReplicas.liveReplicas() + pendingReplicaNum;
|
|
|
return (numEffectiveReplicas >= required) &&
|
|
|
- (pendingReplicaNum > 0 || blockHasEnoughRacks(block));
|
|
|
+ (pendingReplicaNum > 0 || isPlacementPolicySatisfied(block));
|
|
|
}
|
|
|
|
|
|
private ReplicationWork scheduleReplication(BlockInfo block, int priority) {
|
|
@@ -1512,7 +1504,7 @@ public class BlockManager implements BlockStatsMXBean {
|
|
|
|
|
|
DatanodeStorageInfo[] targets = rw.getTargets();
|
|
|
if ( (numReplicas.liveReplicas() >= requiredReplication) &&
|
|
|
- (!blockHasEnoughRacks(block)) ) {
|
|
|
+ (!isPlacementPolicySatisfied(block)) ) {
|
|
|
if (rw.getSrcNode().getNetworkLocation().equals(
|
|
|
targets[0].getDatanodeDescriptor().getNetworkLocation())) {
|
|
|
//No use continuing, unless a new rack in this case
|
|
@@ -2897,7 +2889,7 @@ public class BlockManager implements BlockStatsMXBean {
|
|
|
* If there are any extras, call chooseExcessReplicates() to
|
|
|
* mark them in the excessReplicateMap.
|
|
|
*/
|
|
|
- private void processOverReplicatedBlock(final Block block,
|
|
|
+ private void processOverReplicatedBlock(final BlockInfo block,
|
|
|
final short replication, final DatanodeDescriptor addedNode,
|
|
|
DatanodeDescriptor delNodeHint) {
|
|
|
assert namesystem.hasWriteLock();
|
|
@@ -2930,110 +2922,48 @@ public class BlockManager implements BlockStatsMXBean {
|
|
|
}
|
|
|
}
|
|
|
chooseExcessReplicates(nonExcess, block, replication,
|
|
|
- addedNode, delNodeHint, blockplacement);
|
|
|
+ addedNode, delNodeHint);
|
|
|
}
|
|
|
|
|
|
-
|
|
|
- /**
|
|
|
- * We want "replication" replicates for the block, but we now have too many.
|
|
|
- * In this method, copy enough nodes from 'srcNodes' into 'dstNodes' such that:
|
|
|
- *
|
|
|
- * srcNodes.size() - dstNodes.size() == replication
|
|
|
- *
|
|
|
- * We pick node that make sure that replicas are spread across racks and
|
|
|
- * also try hard to pick one with least free space.
|
|
|
- * The algorithm is first to pick a node with least free space from nodes
|
|
|
- * that are on a rack holding more than one replicas of the block.
|
|
|
- * So removing such a replica won't remove a rack.
|
|
|
- * If no such a node is available,
|
|
|
- * then pick a node with least free space
|
|
|
- */
|
|
|
- private void chooseExcessReplicates(final Collection<DatanodeStorageInfo> nonExcess,
|
|
|
- Block b, short replication,
|
|
|
- DatanodeDescriptor addedNode,
|
|
|
- DatanodeDescriptor delNodeHint,
|
|
|
- BlockPlacementPolicy replicator) {
|
|
|
+ private void chooseExcessReplicates(
|
|
|
+ final Collection<DatanodeStorageInfo> nonExcess,
|
|
|
+ BlockInfo storedBlock, short replication,
|
|
|
+ DatanodeDescriptor addedNode,
|
|
|
+ DatanodeDescriptor delNodeHint) {
|
|
|
assert namesystem.hasWriteLock();
|
|
|
// first form a rack to datanodes map and
|
|
|
- BlockInfo bi = getStoredBlock(b);
|
|
|
- BlockCollection bc = getBlockCollection(bi);
|
|
|
- final BlockStoragePolicy storagePolicy = storagePolicySuite.getPolicy(bc.getStoragePolicyID());
|
|
|
+ BlockCollection bc = getBlockCollection(storedBlock);
|
|
|
+ final BlockStoragePolicy storagePolicy = storagePolicySuite.getPolicy(
|
|
|
+ bc.getStoragePolicyID());
|
|
|
final List<StorageType> excessTypes = storagePolicy.chooseExcess(
|
|
|
replication, DatanodeStorageInfo.toStorageTypes(nonExcess));
|
|
|
-
|
|
|
-
|
|
|
- final Map<String, List<DatanodeStorageInfo>> rackMap
|
|
|
- = new HashMap<String, List<DatanodeStorageInfo>>();
|
|
|
- final List<DatanodeStorageInfo> moreThanOne = new ArrayList<DatanodeStorageInfo>();
|
|
|
- final List<DatanodeStorageInfo> exactlyOne = new ArrayList<DatanodeStorageInfo>();
|
|
|
-
|
|
|
- // split nodes into two sets
|
|
|
- // moreThanOne contains nodes on rack with more than one replica
|
|
|
- // exactlyOne contains the remaining nodes
|
|
|
- replicator.splitNodesWithRack(nonExcess, rackMap, moreThanOne, exactlyOne);
|
|
|
-
|
|
|
- // pick one node to delete that favors the delete hint
|
|
|
- // otherwise pick one with least space from priSet if it is not empty
|
|
|
- // otherwise one node with least space from remains
|
|
|
- boolean firstOne = true;
|
|
|
- final DatanodeStorageInfo delNodeHintStorage
|
|
|
- = DatanodeStorageInfo.getDatanodeStorageInfo(nonExcess, delNodeHint);
|
|
|
- final DatanodeStorageInfo addedNodeStorage
|
|
|
- = DatanodeStorageInfo.getDatanodeStorageInfo(nonExcess, addedNode);
|
|
|
- while (nonExcess.size() - replication > 0) {
|
|
|
- final DatanodeStorageInfo cur;
|
|
|
- if (useDelHint(firstOne, delNodeHintStorage, addedNodeStorage,
|
|
|
- moreThanOne, excessTypes)) {
|
|
|
- cur = delNodeHintStorage;
|
|
|
- } else { // regular excessive replica removal
|
|
|
- cur = replicator.chooseReplicaToDelete(bc, b, replication,
|
|
|
- moreThanOne, exactlyOne, excessTypes);
|
|
|
- }
|
|
|
- firstOne = false;
|
|
|
-
|
|
|
- // adjust rackmap, moreThanOne, and exactlyOne
|
|
|
- replicator.adjustSetsWithChosenReplica(rackMap, moreThanOne,
|
|
|
- exactlyOne, cur);
|
|
|
-
|
|
|
- nonExcess.remove(cur);
|
|
|
- addToExcessReplicate(cur.getDatanodeDescriptor(), b);
|
|
|
-
|
|
|
- //
|
|
|
- // The 'excessblocks' tracks blocks until we get confirmation
|
|
|
- // that the datanode has deleted them; the only way we remove them
|
|
|
- // is when we get a "removeBlock" message.
|
|
|
- //
|
|
|
- // The 'invalidate' list is used to inform the datanode the block
|
|
|
- // should be deleted. Items are removed from the invalidate list
|
|
|
- // upon giving instructions to the namenode.
|
|
|
- //
|
|
|
- addToInvalidates(b, cur.getDatanodeDescriptor());
|
|
|
- blockLog.debug("BLOCK* chooseExcessReplicates: "
|
|
|
- +"({}, {}) is added to invalidated blocks set", cur, b);
|
|
|
- }
|
|
|
- }
|
|
|
-
|
|
|
- /** Check if we can use delHint */
|
|
|
- static boolean useDelHint(boolean isFirst, DatanodeStorageInfo delHint,
|
|
|
- DatanodeStorageInfo added, List<DatanodeStorageInfo> moreThan1Racks,
|
|
|
- List<StorageType> excessTypes) {
|
|
|
- if (!isFirst) {
|
|
|
- return false; // only consider delHint for the first case
|
|
|
- } else if (delHint == null) {
|
|
|
- return false; // no delHint
|
|
|
- } else if (!excessTypes.contains(delHint.getStorageType())) {
|
|
|
- return false; // delHint storage type is not an excess type
|
|
|
- } else {
|
|
|
- // check if removing delHint reduces the number of racks
|
|
|
- if (moreThan1Racks.contains(delHint)) {
|
|
|
- return true; // delHint and some other nodes are under the same rack
|
|
|
- } else if (added != null && !moreThan1Racks.contains(added)) {
|
|
|
- return true; // the added node adds a new rack
|
|
|
- }
|
|
|
- return false; // removing delHint reduces the number of racks;
|
|
|
+ List<DatanodeStorageInfo> replicasToDelete = blockplacement
|
|
|
+ .chooseReplicasToDelete(nonExcess, replication, excessTypes,
|
|
|
+ addedNode, delNodeHint);
|
|
|
+ for (DatanodeStorageInfo choosenReplica : replicasToDelete) {
|
|
|
+ processChosenExcessReplica(nonExcess, choosenReplica, storedBlock);
|
|
|
}
|
|
|
}
|
|
|
|
|
|
+ private void processChosenExcessReplica(
|
|
|
+ final Collection<DatanodeStorageInfo> nonExcess,
|
|
|
+ final DatanodeStorageInfo chosen, BlockInfo storedBlock) {
|
|
|
+ nonExcess.remove(chosen);
|
|
|
+ addToExcessReplicate(chosen.getDatanodeDescriptor(), storedBlock);
|
|
|
+ //
|
|
|
+ // The 'excessblocks' tracks blocks until we get confirmation
|
|
|
+ // that the datanode has deleted them; the only way we remove them
|
|
|
+ // is when we get a "removeBlock" message.
|
|
|
+ //
|
|
|
+ // The 'invalidate' list is used to inform the datanode the block
|
|
|
+ // should be deleted. Items are removed from the invalidate list
|
|
|
+ // upon giving instructions to the datanodes.
|
|
|
+ //
|
|
|
+ addToInvalidates(storedBlock, chosen.getDatanodeDescriptor());
|
|
|
+ blockLog.debug("BLOCK* chooseExcessReplicates: "
|
|
|
+ + "({}, {}) is added to invalidated blocks set", chosen, storedBlock);
|
|
|
+ }
|
|
|
+
|
|
|
private void addToExcessReplicate(DatanodeInfo dn, Block block) {
|
|
|
assert namesystem.hasWriteLock();
|
|
|
LightWeightHashSet<Block> excessBlocks = excessReplicateMap.get(
|
|
@@ -3560,36 +3490,20 @@ public class BlockManager implements BlockStatsMXBean {
|
|
|
return toInvalidate.size();
|
|
|
}
|
|
|
|
|
|
- boolean blockHasEnoughRacks(BlockInfo b) {
|
|
|
- if (!this.shouldCheckForEnoughRacks) {
|
|
|
- return true;
|
|
|
- }
|
|
|
- boolean enoughRacks = false;;
|
|
|
- Collection<DatanodeDescriptor> corruptNodes =
|
|
|
- corruptReplicas.getNodes(b);
|
|
|
- int numExpectedReplicas = getReplication(b);
|
|
|
- String rackName = null;
|
|
|
- for(DatanodeStorageInfo storage : blocksMap.getStorages(b)) {
|
|
|
+ boolean isPlacementPolicySatisfied(BlockInfo storedBlock) {
|
|
|
+ List<DatanodeDescriptor> liveNodes = new ArrayList<>();
|
|
|
+ Collection<DatanodeDescriptor> corruptNodes = corruptReplicas
|
|
|
+ .getNodes(storedBlock);
|
|
|
+ for (DatanodeStorageInfo storage : blocksMap.getStorages(storedBlock)) {
|
|
|
final DatanodeDescriptor cur = storage.getDatanodeDescriptor();
|
|
|
- if (!cur.isDecommissionInProgress() && !cur.isDecommissioned()) {
|
|
|
- if ((corruptNodes == null ) || !corruptNodes.contains(cur)) {
|
|
|
- if (numExpectedReplicas == 1 ||
|
|
|
- (numExpectedReplicas > 1 &&
|
|
|
- !datanodeManager.hasClusterEverBeenMultiRack())) {
|
|
|
- enoughRacks = true;
|
|
|
- break;
|
|
|
- }
|
|
|
- String rackNameNew = cur.getNetworkLocation();
|
|
|
- if (rackName == null) {
|
|
|
- rackName = rackNameNew;
|
|
|
- } else if (!rackName.equals(rackNameNew)) {
|
|
|
- enoughRacks = true;
|
|
|
- break;
|
|
|
- }
|
|
|
- }
|
|
|
+ if (!cur.isDecommissionInProgress() && !cur.isDecommissioned()
|
|
|
+ && ((corruptNodes == null) || !corruptNodes.contains(cur))) {
|
|
|
+ liveNodes.add(cur);
|
|
|
}
|
|
|
}
|
|
|
- return enoughRacks;
|
|
|
+ DatanodeInfo[] locs = liveNodes.toArray(new DatanodeInfo[liveNodes.size()]);
|
|
|
+ return blockplacement.verifyBlockPlacement(locs,
|
|
|
+ storedBlock.getReplication()).isPlacementPolicySatisfied();
|
|
|
}
|
|
|
|
|
|
/**
|
|
@@ -3598,7 +3512,7 @@ public class BlockManager implements BlockStatsMXBean {
|
|
|
*/
|
|
|
boolean isNeededReplication(BlockInfo storedBlock, int current) {
|
|
|
int expected = storedBlock.getReplication();
|
|
|
- return current < expected || !blockHasEnoughRacks(storedBlock);
|
|
|
+ return current < expected || !isPlacementPolicySatisfied(storedBlock);
|
|
|
}
|
|
|
|
|
|
public short getExpectedReplicaNum(BlockInfo block) {
|