|
@@ -54,16 +54,79 @@ public class BlockPlacementPolicyWithNodeGroup extends BlockPlacementPolicyDefau
|
|
super.initialize(conf, stats, clusterMap, host2datanodeMap);
|
|
super.initialize(conf, stats, clusterMap, host2datanodeMap);
|
|
}
|
|
}
|
|
|
|
|
|
- /** choose local node of localMachine as the target.
|
|
|
|
- * if localMachine is not available, choose a node on the same nodegroup or
|
|
|
|
- * rack instead.
|
|
|
|
|
|
+ /**
|
|
|
|
+ * choose all good favored nodes as target.
|
|
|
|
+ * If no enough targets, then choose one replica from
|
|
|
|
+ * each bad favored node's node group.
|
|
|
|
+ * @throws NotEnoughReplicasException
|
|
|
|
+ */
|
|
|
|
+ @Override
|
|
|
|
+ protected void chooseFavouredNodes(String src, int numOfReplicas,
|
|
|
|
+ List<DatanodeDescriptor> favoredNodes,
|
|
|
|
+ Set<Node> favoriteAndExcludedNodes, long blocksize,
|
|
|
|
+ int maxNodesPerRack, List<DatanodeStorageInfo> results,
|
|
|
|
+ boolean avoidStaleNodes, EnumMap<StorageType, Integer> storageTypes)
|
|
|
|
+ throws NotEnoughReplicasException {
|
|
|
|
+ super.chooseFavouredNodes(src, numOfReplicas, favoredNodes,
|
|
|
|
+ favoriteAndExcludedNodes, blocksize, maxNodesPerRack, results,
|
|
|
|
+ avoidStaleNodes, storageTypes);
|
|
|
|
+ if (results.size() < numOfReplicas) {
|
|
|
|
+ // Not enough replicas, choose from unselected Favorednode's Nodegroup
|
|
|
|
+ for (int i = 0;
|
|
|
|
+ i < favoredNodes.size() && results.size() < numOfReplicas; i++) {
|
|
|
|
+ DatanodeDescriptor favoredNode = favoredNodes.get(i);
|
|
|
|
+ boolean chosenNode =
|
|
|
|
+ isNodeChosen(results, favoredNode);
|
|
|
|
+ if (chosenNode) {
|
|
|
|
+ continue;
|
|
|
|
+ }
|
|
|
|
+ NetworkTopologyWithNodeGroup clusterMapNodeGroup =
|
|
|
|
+ (NetworkTopologyWithNodeGroup) clusterMap;
|
|
|
|
+ // try a node on FavouredNode's node group
|
|
|
|
+ DatanodeStorageInfo target = null;
|
|
|
|
+ String scope =
|
|
|
|
+ clusterMapNodeGroup.getNodeGroup(favoredNode.getNetworkLocation());
|
|
|
|
+ try {
|
|
|
|
+ target =
|
|
|
|
+ chooseRandom(scope, favoriteAndExcludedNodes, blocksize,
|
|
|
|
+ maxNodesPerRack, results, avoidStaleNodes, storageTypes);
|
|
|
|
+ } catch (NotEnoughReplicasException e) {
|
|
|
|
+ // catch Exception and continue with other favored nodes
|
|
|
|
+ continue;
|
|
|
|
+ }
|
|
|
|
+ if (target == null) {
|
|
|
|
+ LOG.warn("Could not find a target for file "
|
|
|
|
+ + src + " within nodegroup of favored node " + favoredNode);
|
|
|
|
+ continue;
|
|
|
|
+ }
|
|
|
|
+ favoriteAndExcludedNodes.add(target.getDatanodeDescriptor());
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ private boolean isNodeChosen(
|
|
|
|
+ List<DatanodeStorageInfo> results, DatanodeDescriptor favoredNode) {
|
|
|
|
+ boolean chosenNode = false;
|
|
|
|
+ for (int j = 0; j < results.size(); j++) {
|
|
|
|
+ if (results.get(j).getDatanodeDescriptor().equals(favoredNode)) {
|
|
|
|
+ chosenNode = true;
|
|
|
|
+ break;
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+ return chosenNode;
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ /** choose local node of <i>localMachine</i> as the target.
|
|
|
|
+ * If localMachine is not available, will fallback to nodegroup/rack
|
|
|
|
+ * when flag <i>fallbackToNodeGroupAndLocalRack</i> is set.
|
|
* @return the chosen node
|
|
* @return the chosen node
|
|
*/
|
|
*/
|
|
@Override
|
|
@Override
|
|
protected DatanodeStorageInfo chooseLocalStorage(Node localMachine,
|
|
protected DatanodeStorageInfo chooseLocalStorage(Node localMachine,
|
|
Set<Node> excludedNodes, long blocksize, int maxNodesPerRack,
|
|
Set<Node> excludedNodes, long blocksize, int maxNodesPerRack,
|
|
List<DatanodeStorageInfo> results, boolean avoidStaleNodes,
|
|
List<DatanodeStorageInfo> results, boolean avoidStaleNodes,
|
|
- EnumMap<StorageType, Integer> storageTypes, boolean fallbackToLocalRack)
|
|
|
|
|
|
+ EnumMap<StorageType, Integer> storageTypes,
|
|
|
|
+ boolean fallbackToNodeGroupAndLocalRack)
|
|
throws NotEnoughReplicasException {
|
|
throws NotEnoughReplicasException {
|
|
DatanodeStorageInfo localStorage = chooseLocalStorage(localMachine,
|
|
DatanodeStorageInfo localStorage = chooseLocalStorage(localMachine,
|
|
excludedNodes, blocksize, maxNodesPerRack, results,
|
|
excludedNodes, blocksize, maxNodesPerRack, results,
|
|
@@ -72,6 +135,9 @@ public class BlockPlacementPolicyWithNodeGroup extends BlockPlacementPolicyDefau
|
|
return localStorage;
|
|
return localStorage;
|
|
}
|
|
}
|
|
|
|
|
|
|
|
+ if (!fallbackToNodeGroupAndLocalRack) {
|
|
|
|
+ return null;
|
|
|
|
+ }
|
|
// try a node on local node group
|
|
// try a node on local node group
|
|
DatanodeStorageInfo chosenStorage = chooseLocalNodeGroup(
|
|
DatanodeStorageInfo chosenStorage = chooseLocalNodeGroup(
|
|
(NetworkTopologyWithNodeGroup)clusterMap, localMachine, excludedNodes,
|
|
(NetworkTopologyWithNodeGroup)clusterMap, localMachine, excludedNodes,
|
|
@@ -79,10 +145,6 @@ public class BlockPlacementPolicyWithNodeGroup extends BlockPlacementPolicyDefau
|
|
if (chosenStorage != null) {
|
|
if (chosenStorage != null) {
|
|
return chosenStorage;
|
|
return chosenStorage;
|
|
}
|
|
}
|
|
-
|
|
|
|
- if (!fallbackToLocalRack) {
|
|
|
|
- return null;
|
|
|
|
- }
|
|
|
|
// try a node on local rack
|
|
// try a node on local rack
|
|
return chooseLocalRack(localMachine, excludedNodes,
|
|
return chooseLocalRack(localMachine, excludedNodes,
|
|
blocksize, maxNodesPerRack, results, avoidStaleNodes, storageTypes);
|
|
blocksize, maxNodesPerRack, results, avoidStaleNodes, storageTypes);
|