|
@@ -21,9 +21,8 @@ import static org.apache.hadoop.util.Time.now;
|
|
|
|
|
|
import java.util.ArrayList;
|
|
|
import java.util.Collection;
|
|
|
-import java.util.HashMap;
|
|
|
+import java.util.HashSet;
|
|
|
import java.util.List;
|
|
|
-import java.util.Map;
|
|
|
import java.util.Set;
|
|
|
import java.util.TreeSet;
|
|
|
|
|
@@ -106,10 +105,10 @@ public class BlockPlacementPolicyDefault extends BlockPlacementPolicy {
|
|
|
@Override
|
|
|
public DatanodeDescriptor[] chooseTarget(String srcPath,
|
|
|
int numOfReplicas,
|
|
|
- DatanodeDescriptor writer,
|
|
|
+ Node writer,
|
|
|
List<DatanodeDescriptor> chosenNodes,
|
|
|
boolean returnChosenNodes,
|
|
|
- Map<Node, Node> excludedNodes,
|
|
|
+ Set<Node> excludedNodes,
|
|
|
long blocksize) {
|
|
|
return chooseTarget(numOfReplicas, writer, chosenNodes, returnChosenNodes,
|
|
|
excludedNodes, blocksize);
|
|
@@ -118,8 +117,8 @@ public class BlockPlacementPolicyDefault extends BlockPlacementPolicy {
|
|
|
@Override
|
|
|
DatanodeDescriptor[] chooseTarget(String src,
|
|
|
int numOfReplicas,
|
|
|
- DatanodeDescriptor writer,
|
|
|
- Map<Node, Node> excludedNodes,
|
|
|
+ Node writer,
|
|
|
+ Set<Node> excludedNodes,
|
|
|
long blocksize,
|
|
|
List<DatanodeDescriptor> favoredNodes) {
|
|
|
try {
|
|
@@ -130,8 +129,8 @@ public class BlockPlacementPolicyDefault extends BlockPlacementPolicy {
|
|
|
excludedNodes, blocksize);
|
|
|
}
|
|
|
|
|
|
- Map<Node, Node> favoriteAndExcludedNodes = excludedNodes == null ?
|
|
|
- new HashMap<Node, Node>() : new HashMap<Node, Node>(excludedNodes);
|
|
|
+ Set<Node> favoriteAndExcludedNodes = excludedNodes == null ?
|
|
|
+ new HashSet<Node>() : new HashSet<Node>(excludedNodes);
|
|
|
|
|
|
// Choose favored nodes
|
|
|
List<DatanodeDescriptor> results = new ArrayList<DatanodeDescriptor>();
|
|
@@ -150,10 +149,10 @@ public class BlockPlacementPolicyDefault extends BlockPlacementPolicy {
|
|
|
+ " with favored node " + favoredNode);
|
|
|
continue;
|
|
|
}
|
|
|
- favoriteAndExcludedNodes.put(target, target);
|
|
|
+ favoriteAndExcludedNodes.add(target);
|
|
|
}
|
|
|
|
|
|
- if (results.size() < numOfReplicas) {
|
|
|
+ if (results.size() < numOfReplicas) {
|
|
|
// Not enough favored nodes, choose other nodes.
|
|
|
numOfReplicas -= results.size();
|
|
|
DatanodeDescriptor[] remainingTargets =
|
|
@@ -175,17 +174,17 @@ public class BlockPlacementPolicyDefault extends BlockPlacementPolicy {
|
|
|
|
|
|
/** This is the implementation. */
|
|
|
private DatanodeDescriptor[] chooseTarget(int numOfReplicas,
|
|
|
- DatanodeDescriptor writer,
|
|
|
+ Node writer,
|
|
|
List<DatanodeDescriptor> chosenNodes,
|
|
|
boolean returnChosenNodes,
|
|
|
- Map<Node, Node> excludedNodes,
|
|
|
+ Set<Node> excludedNodes,
|
|
|
long blocksize) {
|
|
|
if (numOfReplicas == 0 || clusterMap.getNumOfLeaves()==0) {
|
|
|
return DatanodeDescriptor.EMPTY_ARRAY;
|
|
|
}
|
|
|
|
|
|
if (excludedNodes == null) {
|
|
|
- excludedNodes = new HashMap<Node, Node>();
|
|
|
+ excludedNodes = new HashSet<Node>();
|
|
|
}
|
|
|
|
|
|
int[] result = getMaxNodesPerRack(chosenNodes, numOfReplicas);
|
|
@@ -200,12 +199,12 @@ public class BlockPlacementPolicyDefault extends BlockPlacementPolicy {
|
|
|
}
|
|
|
|
|
|
if (!clusterMap.contains(writer)) {
|
|
|
- writer=null;
|
|
|
+ writer = null;
|
|
|
}
|
|
|
|
|
|
boolean avoidStaleNodes = (stats != null
|
|
|
&& stats.isAvoidingStaleDataNodesForWrite());
|
|
|
- DatanodeDescriptor localNode = chooseTarget(numOfReplicas, writer,
|
|
|
+ Node localNode = chooseTarget(numOfReplicas, writer,
|
|
|
excludedNodes, blocksize, maxNodesPerRack, results, avoidStaleNodes);
|
|
|
if (!returnChosenNodes) {
|
|
|
results.removeAll(chosenNodes);
|
|
@@ -228,10 +227,20 @@ public class BlockPlacementPolicyDefault extends BlockPlacementPolicy {
|
|
|
return new int[] {numOfReplicas, maxNodesPerRack};
|
|
|
}
|
|
|
|
|
|
- /* choose <i>numOfReplicas</i> from all data nodes */
|
|
|
- private DatanodeDescriptor chooseTarget(int numOfReplicas,
|
|
|
- DatanodeDescriptor writer,
|
|
|
- Map<Node, Node> excludedNodes,
|
|
|
+ /**
|
|
|
+ * choose <i>numOfReplicas</i> from all data nodes
|
|
|
+ * @param numOfReplicas additional number of replicas wanted
|
|
|
+ * @param writer the writer's machine, could be a non-DatanodeDescriptor node
|
|
|
+ * @param excludedNodes datanodes that should not be considered as targets
|
|
|
+ * @param blocksize size of the data to be written
|
|
|
+ * @param maxNodesPerRack max nodes allowed per rack
|
|
|
+ * @param results the target nodes already chosen
|
|
|
+ * @param avoidStaleNodes avoid stale nodes in replica choosing
|
|
|
+ * @return local node of writer (not chosen node)
|
|
|
+ */
|
|
|
+ private Node chooseTarget(int numOfReplicas,
|
|
|
+ Node writer,
|
|
|
+ Set<Node> excludedNodes,
|
|
|
long blocksize,
|
|
|
int maxNodesPerRack,
|
|
|
List<DatanodeDescriptor> results,
|
|
@@ -243,13 +252,13 @@ public class BlockPlacementPolicyDefault extends BlockPlacementPolicy {
|
|
|
|
|
|
int numOfResults = results.size();
|
|
|
boolean newBlock = (numOfResults==0);
|
|
|
- if (writer == null && !newBlock) {
|
|
|
+ if ((writer == null || !(writer instanceof DatanodeDescriptor)) && !newBlock) {
|
|
|
writer = results.get(0);
|
|
|
}
|
|
|
|
|
|
// Keep a copy of original excludedNodes
|
|
|
- final Map<Node, Node> oldExcludedNodes = avoidStaleNodes ?
|
|
|
- new HashMap<Node, Node>(excludedNodes) : null;
|
|
|
+ final Set<Node> oldExcludedNodes = avoidStaleNodes ?
|
|
|
+ new HashSet<Node>(excludedNodes) : null;
|
|
|
try {
|
|
|
if (numOfResults == 0) {
|
|
|
writer = chooseLocalNode(writer, excludedNodes, blocksize,
|
|
@@ -296,7 +305,7 @@ public class BlockPlacementPolicyDefault extends BlockPlacementPolicy {
|
|
|
// We need to additionally exclude the nodes that were added to the
|
|
|
// result list in the successful calls to choose*() above.
|
|
|
for (Node node : results) {
|
|
|
- oldExcludedNodes.put(node, node);
|
|
|
+ oldExcludedNodes.add(node);
|
|
|
}
|
|
|
// Set numOfReplicas, since it can get out of sync with the result list
|
|
|
// if the NotEnoughReplicasException was thrown in chooseRandom().
|
|
@@ -314,8 +323,8 @@ public class BlockPlacementPolicyDefault extends BlockPlacementPolicy {
|
|
|
* choose a node on the same rack
|
|
|
* @return the chosen node
|
|
|
*/
|
|
|
- protected DatanodeDescriptor chooseLocalNode(DatanodeDescriptor localMachine,
|
|
|
- Map<Node, Node> excludedNodes,
|
|
|
+ protected DatanodeDescriptor chooseLocalNode(Node localMachine,
|
|
|
+ Set<Node> excludedNodes,
|
|
|
long blocksize,
|
|
|
int maxNodesPerRack,
|
|
|
List<DatanodeDescriptor> results,
|
|
@@ -325,13 +334,13 @@ public class BlockPlacementPolicyDefault extends BlockPlacementPolicy {
|
|
|
if (localMachine == null)
|
|
|
return chooseRandom(NodeBase.ROOT, excludedNodes, blocksize,
|
|
|
maxNodesPerRack, results, avoidStaleNodes);
|
|
|
- if (preferLocalNode) {
|
|
|
+ if (preferLocalNode && localMachine instanceof DatanodeDescriptor) {
|
|
|
+ DatanodeDescriptor localDatanode = (DatanodeDescriptor) localMachine;
|
|
|
// otherwise try local machine first
|
|
|
- Node oldNode = excludedNodes.put(localMachine, localMachine);
|
|
|
- if (oldNode == null) { // was not in the excluded list
|
|
|
- if (addIfIsGoodTarget(localMachine, excludedNodes, blocksize,
|
|
|
+ if (excludedNodes.add(localMachine)) { // was not in the excluded list
|
|
|
+ if (addIfIsGoodTarget(localDatanode, excludedNodes, blocksize,
|
|
|
maxNodesPerRack, false, results, avoidStaleNodes) >= 0) {
|
|
|
- return localMachine;
|
|
|
+ return localDatanode;
|
|
|
}
|
|
|
}
|
|
|
}
|
|
@@ -347,9 +356,8 @@ public class BlockPlacementPolicyDefault extends BlockPlacementPolicy {
|
|
|
* @return number of new excluded nodes
|
|
|
*/
|
|
|
protected int addToExcludedNodes(DatanodeDescriptor localMachine,
|
|
|
- Map<Node, Node> excludedNodes) {
|
|
|
- Node node = excludedNodes.put(localMachine, localMachine);
|
|
|
- return node == null?1:0;
|
|
|
+ Set<Node> excludedNodes) {
|
|
|
+ return excludedNodes.add(localMachine) ? 1 : 0;
|
|
|
}
|
|
|
|
|
|
/**
|
|
@@ -360,8 +368,8 @@ public class BlockPlacementPolicyDefault extends BlockPlacementPolicy {
|
|
|
* in the cluster.
|
|
|
* @return the chosen node
|
|
|
*/
|
|
|
- protected DatanodeDescriptor chooseLocalRack(DatanodeDescriptor localMachine,
|
|
|
- Map<Node, Node> excludedNodes,
|
|
|
+ protected DatanodeDescriptor chooseLocalRack(Node localMachine,
|
|
|
+ Set<Node> excludedNodes,
|
|
|
long blocksize,
|
|
|
int maxNodesPerRack,
|
|
|
List<DatanodeDescriptor> results,
|
|
@@ -412,7 +420,7 @@ public class BlockPlacementPolicyDefault extends BlockPlacementPolicy {
|
|
|
|
|
|
protected void chooseRemoteRack(int numOfReplicas,
|
|
|
DatanodeDescriptor localMachine,
|
|
|
- Map<Node, Node> excludedNodes,
|
|
|
+ Set<Node> excludedNodes,
|
|
|
long blocksize,
|
|
|
int maxReplicasPerRack,
|
|
|
List<DatanodeDescriptor> results,
|
|
@@ -436,7 +444,7 @@ public class BlockPlacementPolicyDefault extends BlockPlacementPolicy {
|
|
|
* @return the chosen node, if there is any.
|
|
|
*/
|
|
|
protected DatanodeDescriptor chooseRandom(String scope,
|
|
|
- Map<Node, Node> excludedNodes,
|
|
|
+ Set<Node> excludedNodes,
|
|
|
long blocksize,
|
|
|
int maxNodesPerRack,
|
|
|
List<DatanodeDescriptor> results,
|
|
@@ -452,7 +460,7 @@ public class BlockPlacementPolicyDefault extends BlockPlacementPolicy {
|
|
|
*/
|
|
|
protected DatanodeDescriptor chooseRandom(int numOfReplicas,
|
|
|
String scope,
|
|
|
- Map<Node, Node> excludedNodes,
|
|
|
+ Set<Node> excludedNodes,
|
|
|
long blocksize,
|
|
|
int maxNodesPerRack,
|
|
|
List<DatanodeDescriptor> results,
|
|
@@ -460,7 +468,7 @@ public class BlockPlacementPolicyDefault extends BlockPlacementPolicy {
|
|
|
throws NotEnoughReplicasException {
|
|
|
|
|
|
int numOfAvailableNodes = clusterMap.countNumOfAvailableNodes(
|
|
|
- scope, excludedNodes.keySet());
|
|
|
+ scope, excludedNodes);
|
|
|
StringBuilder builder = null;
|
|
|
if (LOG.isDebugEnabled()) {
|
|
|
builder = debugLoggingBuilder.get();
|
|
@@ -472,8 +480,7 @@ public class BlockPlacementPolicyDefault extends BlockPlacementPolicy {
|
|
|
while(numOfReplicas > 0 && numOfAvailableNodes > 0) {
|
|
|
DatanodeDescriptor chosenNode =
|
|
|
(DatanodeDescriptor)clusterMap.chooseRandom(scope);
|
|
|
- Node oldNode = excludedNodes.put(chosenNode, chosenNode);
|
|
|
- if (oldNode == null) {
|
|
|
+ if (excludedNodes.add(chosenNode)) { //was not in the excluded list
|
|
|
numOfAvailableNodes--;
|
|
|
|
|
|
int newExcludedNodes = addIfIsGoodTarget(chosenNode, excludedNodes,
|
|
@@ -506,16 +513,16 @@ public class BlockPlacementPolicyDefault extends BlockPlacementPolicy {
|
|
|
|
|
|
/**
|
|
|
* If the given node is a good target, add it to the result list and
|
|
|
- * update the excluded node map.
|
|
|
+ * update the set of excluded nodes.
|
|
|
* @return -1 if the given is not a good target;
|
|
|
- * otherwise, return the number of excluded nodes added to the map.
|
|
|
+ * otherwise, return the number of nodes added to excludedNodes set.
|
|
|
*/
|
|
|
int addIfIsGoodTarget(DatanodeDescriptor node,
|
|
|
- Map<Node, Node> excludedNodes,
|
|
|
+ Set<Node> excludedNodes,
|
|
|
long blockSize,
|
|
|
int maxNodesPerRack,
|
|
|
boolean considerLoad,
|
|
|
- List<DatanodeDescriptor> results,
|
|
|
+ List<DatanodeDescriptor> results,
|
|
|
boolean avoidStaleNodes) {
|
|
|
if (isGoodTarget(node, blockSize, maxNodesPerRack, considerLoad,
|
|
|
results, avoidStaleNodes)) {
|
|
@@ -614,7 +621,7 @@ public class BlockPlacementPolicyDefault extends BlockPlacementPolicy {
|
|
|
* starts from the writer and traverses all <i>nodes</i>
|
|
|
* This is basically a traveling salesman problem.
|
|
|
*/
|
|
|
- private DatanodeDescriptor[] getPipeline(DatanodeDescriptor writer,
|
|
|
+ private DatanodeDescriptor[] getPipeline(Node writer,
|
|
|
DatanodeDescriptor[] nodes) {
|
|
|
if (nodes.length==0) return nodes;
|
|
|
|