|
@@ -17,9 +17,18 @@
|
|
|
*/
|
|
|
package org.apache.hadoop.hdfs.server.namenode;
|
|
|
|
|
|
-import org.apache.commons.logging.*;
|
|
|
+import java.util.ArrayList;
|
|
|
+import java.util.Collection;
|
|
|
+import java.util.HashMap;
|
|
|
+import java.util.Iterator;
|
|
|
+import java.util.List;
|
|
|
+import java.util.Set;
|
|
|
+import java.util.TreeSet;
|
|
|
+
|
|
|
+import org.apache.commons.logging.Log;
|
|
|
import org.apache.hadoop.classification.InterfaceAudience;
|
|
|
import org.apache.hadoop.conf.Configuration;
|
|
|
+import org.apache.hadoop.hdfs.DFSConfigKeys;
|
|
|
import org.apache.hadoop.hdfs.protocol.Block;
|
|
|
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
|
|
|
import org.apache.hadoop.hdfs.protocol.FSConstants;
|
|
@@ -27,7 +36,6 @@ import org.apache.hadoop.hdfs.protocol.LocatedBlock;
|
|
|
import org.apache.hadoop.net.NetworkTopology;
|
|
|
import org.apache.hadoop.net.Node;
|
|
|
import org.apache.hadoop.net.NodeBase;
|
|
|
-import java.util.*;
|
|
|
|
|
|
/** The class is responsible for choosing the desired number of targets
|
|
|
* for placing block replicas.
|
|
@@ -42,6 +50,7 @@ public class BlockPlacementPolicyDefault extends BlockPlacementPolicy {
|
|
|
private boolean considerLoad;
|
|
|
private NetworkTopology clusterMap;
|
|
|
private FSClusterStats stats;
|
|
|
+ private long staleInterval; // interval used to identify stale DataNodes
|
|
|
|
|
|
BlockPlacementPolicyDefault(Configuration conf, FSClusterStats stats,
|
|
|
NetworkTopology clusterMap) {
|
|
@@ -54,9 +63,13 @@ public class BlockPlacementPolicyDefault extends BlockPlacementPolicy {
|
|
|
/** {@inheritDoc} */
|
|
|
public void initialize(Configuration conf, FSClusterStats stats,
|
|
|
NetworkTopology clusterMap) {
|
|
|
- this.considerLoad = conf.getBoolean("dfs.replication.considerLoad", true);
|
|
|
+ this.considerLoad = conf.getBoolean(
|
|
|
+ DFSConfigKeys.DFS_NAMENODE_REPLICATION_CONSIDERLOAD_KEY, true);
|
|
|
this.stats = stats;
|
|
|
this.clusterMap = clusterMap;
|
|
|
+ this.staleInterval = conf.getLong(
|
|
|
+ DFSConfigKeys.DFS_NAMENODE_STALE_DATANODE_INTERVAL_KEY,
|
|
|
+ DFSConfigKeys.DFS_NAMENODE_STALE_DATANODE_INTERVAL_DEFAULT);
|
|
|
}
|
|
|
|
|
|
/** {@inheritDoc} */
|
|
@@ -125,8 +138,10 @@ public class BlockPlacementPolicyDefault extends BlockPlacementPolicy {
|
|
|
writer=null;
|
|
|
}
|
|
|
|
|
|
- DatanodeDescriptor localNode = chooseTarget(numOfReplicas, writer,
|
|
|
- excludedNodes, blocksize, maxNodesPerRack, results);
|
|
|
+ boolean avoidStaleNodes = (stats != null && stats
|
|
|
+ .isAvoidingStaleDataNodesForWrite());
|
|
|
+ DatanodeDescriptor localNode = chooseTarget(numOfReplicas, writer,
|
|
|
+ excludedNodes, blocksize, maxNodesPerRack, results, avoidStaleNodes);
|
|
|
|
|
|
results.removeAll(chosenNodes);
|
|
|
|
|
@@ -137,11 +152,9 @@ public class BlockPlacementPolicyDefault extends BlockPlacementPolicy {
|
|
|
|
|
|
/* choose <i>numOfReplicas</i> from all data nodes */
|
|
|
private DatanodeDescriptor chooseTarget(int numOfReplicas,
|
|
|
- DatanodeDescriptor writer,
|
|
|
- HashMap<Node, Node> excludedNodes,
|
|
|
- long blocksize,
|
|
|
- int maxNodesPerRack,
|
|
|
- List<DatanodeDescriptor> results) {
|
|
|
+ DatanodeDescriptor writer, HashMap<Node, Node> excludedNodes,
|
|
|
+ long blocksize, int maxNodesPerRack, List<DatanodeDescriptor> results,
|
|
|
+ boolean avoidStaleNodes) {
|
|
|
|
|
|
if (numOfReplicas == 0 || clusterMap.getNumOfLeaves()==0) {
|
|
|
return writer;
|
|
@@ -152,42 +165,57 @@ public class BlockPlacementPolicyDefault extends BlockPlacementPolicy {
|
|
|
if (writer == null && !newBlock) {
|
|
|
writer = results.get(0);
|
|
|
}
|
|
|
-
|
|
|
+
|
|
|
+ // Keep a copy of original excludedNodes
|
|
|
+ final HashMap<Node, Node> oldExcludedNodes = avoidStaleNodes ?
|
|
|
+ new HashMap<Node, Node>(excludedNodes) : null;
|
|
|
+
|
|
|
try {
|
|
|
if (numOfResults == 0) {
|
|
|
- writer = chooseLocalNode(writer, excludedNodes,
|
|
|
- blocksize, maxNodesPerRack, results);
|
|
|
+ writer = chooseLocalNode(writer, excludedNodes, blocksize,
|
|
|
+ maxNodesPerRack, results, avoidStaleNodes);
|
|
|
if (--numOfReplicas == 0) {
|
|
|
return writer;
|
|
|
}
|
|
|
}
|
|
|
if (numOfResults <= 1) {
|
|
|
- chooseRemoteRack(1, results.get(0), excludedNodes,
|
|
|
- blocksize, maxNodesPerRack, results);
|
|
|
+ chooseRemoteRack(1, results.get(0), excludedNodes, blocksize,
|
|
|
+ maxNodesPerRack, results, avoidStaleNodes);
|
|
|
if (--numOfReplicas == 0) {
|
|
|
return writer;
|
|
|
}
|
|
|
}
|
|
|
if (numOfResults <= 2) {
|
|
|
if (clusterMap.isOnSameRack(results.get(0), results.get(1))) {
|
|
|
- chooseRemoteRack(1, results.get(0), excludedNodes,
|
|
|
- blocksize, maxNodesPerRack, results);
|
|
|
+ chooseRemoteRack(1, results.get(0), excludedNodes, blocksize,
|
|
|
+ maxNodesPerRack, results, avoidStaleNodes);
|
|
|
} else if (newBlock){
|
|
|
- chooseLocalRack(results.get(1), excludedNodes, blocksize,
|
|
|
- maxNodesPerRack, results);
|
|
|
+ chooseLocalRack(results.get(1), excludedNodes, blocksize,
|
|
|
+ maxNodesPerRack, results, avoidStaleNodes);
|
|
|
} else {
|
|
|
- chooseLocalRack(writer, excludedNodes, blocksize,
|
|
|
- maxNodesPerRack, results);
|
|
|
+ chooseLocalRack(writer, excludedNodes, blocksize, maxNodesPerRack,
|
|
|
+ results, avoidStaleNodes);
|
|
|
}
|
|
|
if (--numOfReplicas == 0) {
|
|
|
return writer;
|
|
|
}
|
|
|
}
|
|
|
- chooseRandom(numOfReplicas, NodeBase.ROOT, excludedNodes,
|
|
|
- blocksize, maxNodesPerRack, results);
|
|
|
+ chooseRandom(numOfReplicas, NodeBase.ROOT, excludedNodes, blocksize,
|
|
|
+ maxNodesPerRack, results, avoidStaleNodes);
|
|
|
} catch (NotEnoughReplicasException e) {
|
|
|
FSNamesystem.LOG.warn("Not able to place enough replicas, still in need of "
|
|
|
+ numOfReplicas);
|
|
|
+ if (avoidStaleNodes) {
|
|
|
+ // excludedNodes now has - initial excludedNodes, any nodes that were
|
|
|
+ // chosen and nodes that were tried but were not chosen because they
|
|
|
+ // were stale, decommissioned or for any other reason a node is not
|
|
|
+ // chosen for write. Retry again now not avoiding stale node
|
|
|
+ for (Node node : results) {
|
|
|
+ oldExcludedNodes.put(node, node);
|
|
|
+ }
|
|
|
+ return chooseTarget(numOfReplicas, writer, oldExcludedNodes, blocksize,
|
|
|
+ maxNodesPerRack, results, false);
|
|
|
+ }
|
|
|
}
|
|
|
return writer;
|
|
|
}
|
|
@@ -197,31 +225,28 @@ public class BlockPlacementPolicyDefault extends BlockPlacementPolicy {
|
|
|
* choose a node on the same rack
|
|
|
* @return the chosen node
|
|
|
*/
|
|
|
- private DatanodeDescriptor chooseLocalNode(
|
|
|
- DatanodeDescriptor localMachine,
|
|
|
- HashMap<Node, Node> excludedNodes,
|
|
|
- long blocksize,
|
|
|
- int maxNodesPerRack,
|
|
|
- List<DatanodeDescriptor> results)
|
|
|
+ private DatanodeDescriptor chooseLocalNode(DatanodeDescriptor localMachine,
|
|
|
+ HashMap<Node, Node> excludedNodes, long blocksize, int maxNodesPerRack,
|
|
|
+ List<DatanodeDescriptor> results, boolean avoidStaleNodes)
|
|
|
throws NotEnoughReplicasException {
|
|
|
// if no local machine, randomly choose one node
|
|
|
if (localMachine == null)
|
|
|
- return chooseRandom(NodeBase.ROOT, excludedNodes,
|
|
|
- blocksize, maxNodesPerRack, results);
|
|
|
+ return chooseRandom(NodeBase.ROOT, excludedNodes, blocksize,
|
|
|
+ maxNodesPerRack, results, avoidStaleNodes);
|
|
|
|
|
|
// otherwise try local machine first
|
|
|
Node oldNode = excludedNodes.put(localMachine, localMachine);
|
|
|
if (oldNode == null) { // was not in the excluded list
|
|
|
- if (isGoodTarget(localMachine, blocksize,
|
|
|
- maxNodesPerRack, false, results)) {
|
|
|
+ if (isGoodTarget(localMachine, blocksize, maxNodesPerRack, false,
|
|
|
+ results, avoidStaleNodes)) {
|
|
|
results.add(localMachine);
|
|
|
return localMachine;
|
|
|
}
|
|
|
}
|
|
|
|
|
|
// try a node on local rack
|
|
|
- return chooseLocalRack(localMachine, excludedNodes,
|
|
|
- blocksize, maxNodesPerRack, results);
|
|
|
+ return chooseLocalRack(localMachine, excludedNodes, blocksize,
|
|
|
+ maxNodesPerRack, results, avoidStaleNodes);
|
|
|
}
|
|
|
|
|
|
/* choose one node from the rack that <i>localMachine</i> is on.
|
|
@@ -231,24 +256,20 @@ public class BlockPlacementPolicyDefault extends BlockPlacementPolicy {
|
|
|
* in the cluster.
|
|
|
* @return the chosen node
|
|
|
*/
|
|
|
- private DatanodeDescriptor chooseLocalRack(
|
|
|
- DatanodeDescriptor localMachine,
|
|
|
- HashMap<Node, Node> excludedNodes,
|
|
|
- long blocksize,
|
|
|
- int maxNodesPerRack,
|
|
|
- List<DatanodeDescriptor> results)
|
|
|
+ private DatanodeDescriptor chooseLocalRack(DatanodeDescriptor localMachine,
|
|
|
+ HashMap<Node, Node> excludedNodes, long blocksize, int maxNodesPerRack,
|
|
|
+ List<DatanodeDescriptor> results, boolean avoidStaleNodes)
|
|
|
throws NotEnoughReplicasException {
|
|
|
// no local machine, so choose a random machine
|
|
|
if (localMachine == null) {
|
|
|
- return chooseRandom(NodeBase.ROOT, excludedNodes,
|
|
|
- blocksize, maxNodesPerRack, results);
|
|
|
+ return chooseRandom(NodeBase.ROOT, excludedNodes, blocksize,
|
|
|
+ maxNodesPerRack, results, avoidStaleNodes);
|
|
|
}
|
|
|
|
|
|
// choose one from the local rack
|
|
|
try {
|
|
|
- return chooseRandom(
|
|
|
- localMachine.getNetworkLocation(),
|
|
|
- excludedNodes, blocksize, maxNodesPerRack, results);
|
|
|
+ return chooseRandom(localMachine.getNetworkLocation(), excludedNodes,
|
|
|
+ blocksize, maxNodesPerRack, results, avoidStaleNodes);
|
|
|
} catch (NotEnoughReplicasException e1) {
|
|
|
// find the second replica
|
|
|
DatanodeDescriptor newLocal=null;
|
|
@@ -262,18 +283,17 @@ public class BlockPlacementPolicyDefault extends BlockPlacementPolicy {
|
|
|
}
|
|
|
if (newLocal != null) {
|
|
|
try {
|
|
|
- return chooseRandom(
|
|
|
- newLocal.getNetworkLocation(),
|
|
|
- excludedNodes, blocksize, maxNodesPerRack, results);
|
|
|
+ return chooseRandom(newLocal.getNetworkLocation(), excludedNodes,
|
|
|
+ blocksize, maxNodesPerRack, results, avoidStaleNodes);
|
|
|
} catch(NotEnoughReplicasException e2) {
|
|
|
- //otherwise randomly choose one from the network
|
|
|
- return chooseRandom(NodeBase.ROOT, excludedNodes,
|
|
|
- blocksize, maxNodesPerRack, results);
|
|
|
+ // otherwise randomly choose one from the network
|
|
|
+ return chooseRandom(NodeBase.ROOT, excludedNodes, blocksize,
|
|
|
+ maxNodesPerRack, results, avoidStaleNodes);
|
|
|
}
|
|
|
} else {
|
|
|
//otherwise randomly choose one from the network
|
|
|
- return chooseRandom(NodeBase.ROOT, excludedNodes,
|
|
|
- blocksize, maxNodesPerRack, results);
|
|
|
+ return chooseRandom(NodeBase.ROOT, excludedNodes, blocksize,
|
|
|
+ maxNodesPerRack, results, avoidStaleNodes);
|
|
|
}
|
|
|
}
|
|
|
}
|
|
@@ -289,29 +309,31 @@ public class BlockPlacementPolicyDefault extends BlockPlacementPolicy {
|
|
|
HashMap<Node, Node> excludedNodes,
|
|
|
long blocksize,
|
|
|
int maxReplicasPerRack,
|
|
|
- List<DatanodeDescriptor> results)
|
|
|
+ List<DatanodeDescriptor> results,
|
|
|
+ boolean avoidStaleNodes)
|
|
|
throws NotEnoughReplicasException {
|
|
|
int oldNumOfReplicas = results.size();
|
|
|
// randomly choose one node from remote racks
|
|
|
try {
|
|
|
- chooseRandom(numOfReplicas, "~"+localMachine.getNetworkLocation(),
|
|
|
- excludedNodes, blocksize, maxReplicasPerRack, results);
|
|
|
+ chooseRandom(numOfReplicas, "~" + localMachine.getNetworkLocation(),
|
|
|
+ excludedNodes, blocksize, maxReplicasPerRack, results,
|
|
|
+ avoidStaleNodes);
|
|
|
} catch (NotEnoughReplicasException e) {
|
|
|
chooseRandom(numOfReplicas-(results.size()-oldNumOfReplicas),
|
|
|
localMachine.getNetworkLocation(), excludedNodes, blocksize,
|
|
|
- maxReplicasPerRack, results);
|
|
|
+ maxReplicasPerRack, results, avoidStaleNodes);
|
|
|
}
|
|
|
}
|
|
|
|
|
|
/* Randomly choose one target from <i>nodes</i>.
|
|
|
* @return the chosen node
|
|
|
*/
|
|
|
- private DatanodeDescriptor chooseRandom(
|
|
|
- String nodes,
|
|
|
+ private DatanodeDescriptor chooseRandom(String nodes,
|
|
|
HashMap<Node, Node> excludedNodes,
|
|
|
long blocksize,
|
|
|
int maxNodesPerRack,
|
|
|
- List<DatanodeDescriptor> results)
|
|
|
+ List<DatanodeDescriptor> results,
|
|
|
+ boolean avoidStaleNodes)
|
|
|
throws NotEnoughReplicasException {
|
|
|
int numOfAvailableNodes =
|
|
|
clusterMap.countNumOfAvailableNodes(nodes, excludedNodes.keySet());
|
|
@@ -322,7 +344,8 @@ public class BlockPlacementPolicyDefault extends BlockPlacementPolicy {
|
|
|
Node oldNode = excludedNodes.put(chosenNode, chosenNode);
|
|
|
if (oldNode == null) { // choosendNode was not in the excluded list
|
|
|
numOfAvailableNodes--;
|
|
|
- if (isGoodTarget(chosenNode, blocksize, maxNodesPerRack, results)) {
|
|
|
+ if (isGoodTarget(chosenNode, blocksize, maxNodesPerRack, results,
|
|
|
+ avoidStaleNodes)) {
|
|
|
results.add(chosenNode);
|
|
|
return chosenNode;
|
|
|
}
|
|
@@ -340,7 +363,8 @@ public class BlockPlacementPolicyDefault extends BlockPlacementPolicy {
|
|
|
HashMap<Node, Node> excludedNodes,
|
|
|
long blocksize,
|
|
|
int maxNodesPerRack,
|
|
|
- List<DatanodeDescriptor> results)
|
|
|
+ List<DatanodeDescriptor> results,
|
|
|
+ boolean avoidStaleNodes)
|
|
|
throws NotEnoughReplicasException {
|
|
|
|
|
|
int numOfAvailableNodes =
|
|
@@ -352,7 +376,8 @@ public class BlockPlacementPolicyDefault extends BlockPlacementPolicy {
|
|
|
if (oldNode == null) {
|
|
|
numOfAvailableNodes--;
|
|
|
|
|
|
- if (isGoodTarget(chosenNode, blocksize, maxNodesPerRack, results)) {
|
|
|
+ if (isGoodTarget(chosenNode, blocksize, maxNodesPerRack, results,
|
|
|
+ avoidStaleNodes)) {
|
|
|
numOfReplicas--;
|
|
|
results.add(chosenNode);
|
|
|
}
|
|
@@ -369,17 +394,34 @@ public class BlockPlacementPolicyDefault extends BlockPlacementPolicy {
|
|
|
* return true if <i>node</i> has enough space,
|
|
|
* does not have too much load, and the rack does not have too many nodes
|
|
|
*/
|
|
|
- private boolean isGoodTarget(DatanodeDescriptor node,
|
|
|
- long blockSize, int maxTargetPerLoc,
|
|
|
- List<DatanodeDescriptor> results) {
|
|
|
- return isGoodTarget(node, blockSize, maxTargetPerLoc,
|
|
|
- this.considerLoad, results);
|
|
|
+ private boolean isGoodTarget(DatanodeDescriptor node, long blockSize,
|
|
|
+ int maxTargetPerLoc, List<DatanodeDescriptor> results,
|
|
|
+ boolean avoidStaleNodes) {
|
|
|
+ return isGoodTarget(node, blockSize, maxTargetPerLoc, this.considerLoad,
|
|
|
+ results, avoidStaleNodes);
|
|
|
}
|
|
|
-
|
|
|
+
|
|
|
+ /**
|
|
|
+ * Determine if a node is a good target.
|
|
|
+ *
|
|
|
+ * @param node The target node
|
|
|
+ * @param blockSize Size of block
|
|
|
+ * @param maxTargetPerLoc Maximum number of targets per rack. The value of
|
|
|
+ * this parameter depends on the number of racks in
|
|
|
+ * the cluster and total number of replicas for a block
|
|
|
+ * @param considerLoad whether or not to consider load of the target node
|
|
|
+ * @param results A list containing currently chosen nodes. Used to check if
|
|
|
+ * too many nodes has been chosen in the target rack.
|
|
|
+ * @param avoidStaleNodes Whether or not to avoid choosing stale nodes.
|
|
|
+ * @return Return true if <i>node</i> has enough space,
|
|
|
+ * does not have too much load,
|
|
|
+ * and the rack does not have too many nodes.
|
|
|
+ */
|
|
|
private boolean isGoodTarget(DatanodeDescriptor node,
|
|
|
long blockSize, int maxTargetPerLoc,
|
|
|
boolean considerLoad,
|
|
|
- List<DatanodeDescriptor> results) {
|
|
|
+ List<DatanodeDescriptor> results,
|
|
|
+ boolean avoidStaleNodes) {
|
|
|
Log logr = FSNamesystem.LOG;
|
|
|
// check if the node is (being) decommissed
|
|
|
if (node.isDecommissionInProgress() || node.isDecommissioned()) {
|
|
@@ -388,6 +430,14 @@ public class BlockPlacementPolicyDefault extends BlockPlacementPolicy {
|
|
|
return false;
|
|
|
}
|
|
|
|
|
|
+ if (avoidStaleNodes) {
|
|
|
+ if (node.isStale(this.staleInterval)) {
|
|
|
+ logr.debug("Node "+NodeBase.getPath(node)+
|
|
|
+ " is not chosen because the node is (being) stale");
|
|
|
+ return false;
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
long remaining = node.getRemaining() -
|
|
|
(node.getBlocksScheduled() * blockSize);
|
|
|
// check the remaining capacity of the target machine
|