|
@@ -437,17 +437,11 @@ public class BlockPlacementPolicyDefault extends BlockPlacementPolicy {
|
|
maxNodesPerRack, results, avoidStaleNodes, storageTypes);
|
|
maxNodesPerRack, results, avoidStaleNodes, storageTypes);
|
|
return writer;
|
|
return writer;
|
|
}
|
|
}
|
|
-
|
|
|
|
- /**
|
|
|
|
- * Choose <i>localMachine</i> as the target.
|
|
|
|
- * if <i>localMachine</i> is not available,
|
|
|
|
- * choose a node on the same rack
|
|
|
|
- * @return the chosen storage
|
|
|
|
- */
|
|
|
|
|
|
+
|
|
protected DatanodeStorageInfo chooseLocalStorage(Node localMachine,
|
|
protected DatanodeStorageInfo chooseLocalStorage(Node localMachine,
|
|
Set<Node> excludedNodes, long blocksize, int maxNodesPerRack,
|
|
Set<Node> excludedNodes, long blocksize, int maxNodesPerRack,
|
|
List<DatanodeStorageInfo> results, boolean avoidStaleNodes,
|
|
List<DatanodeStorageInfo> results, boolean avoidStaleNodes,
|
|
- EnumMap<StorageType, Integer> storageTypes, boolean fallbackToLocalRack)
|
|
|
|
|
|
+ EnumMap<StorageType, Integer> storageTypes)
|
|
throws NotEnoughReplicasException {
|
|
throws NotEnoughReplicasException {
|
|
// if no local machine, randomly choose one node
|
|
// if no local machine, randomly choose one node
|
|
if (localMachine == null) {
|
|
if (localMachine == null) {
|
|
@@ -458,7 +452,9 @@ public class BlockPlacementPolicyDefault extends BlockPlacementPolicy {
|
|
&& clusterMap.contains(localMachine)) {
|
|
&& clusterMap.contains(localMachine)) {
|
|
DatanodeDescriptor localDatanode = (DatanodeDescriptor) localMachine;
|
|
DatanodeDescriptor localDatanode = (DatanodeDescriptor) localMachine;
|
|
// otherwise try local machine first
|
|
// otherwise try local machine first
|
|
- if (excludedNodes.add(localMachine)) { // was not in the excluded list
|
|
|
|
|
|
+ if (excludedNodes.add(localMachine) // was not in the excluded list
|
|
|
|
+ && isGoodDatanode(localDatanode, maxNodesPerRack, false,
|
|
|
|
+ results, avoidStaleNodes)) {
|
|
for (Iterator<Map.Entry<StorageType, Integer>> iter = storageTypes
|
|
for (Iterator<Map.Entry<StorageType, Integer>> iter = storageTypes
|
|
.entrySet().iterator(); iter.hasNext(); ) {
|
|
.entrySet().iterator(); iter.hasNext(); ) {
|
|
Map.Entry<StorageType, Integer> entry = iter.next();
|
|
Map.Entry<StorageType, Integer> entry = iter.next();
|
|
@@ -466,7 +462,7 @@ public class BlockPlacementPolicyDefault extends BlockPlacementPolicy {
|
|
localDatanode.getStorageInfos())) {
|
|
localDatanode.getStorageInfos())) {
|
|
StorageType type = entry.getKey();
|
|
StorageType type = entry.getKey();
|
|
if (addIfIsGoodTarget(localStorage, excludedNodes, blocksize,
|
|
if (addIfIsGoodTarget(localStorage, excludedNodes, blocksize,
|
|
- maxNodesPerRack, false, results, avoidStaleNodes, type) >= 0) {
|
|
|
|
|
|
+ results, type) >= 0) {
|
|
int num = entry.getValue();
|
|
int num = entry.getValue();
|
|
if (num == 1) {
|
|
if (num == 1) {
|
|
iter.remove();
|
|
iter.remove();
|
|
@@ -479,6 +475,26 @@ public class BlockPlacementPolicyDefault extends BlockPlacementPolicy {
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
+ return null;
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ /**
|
|
|
|
+ * Choose <i>localMachine</i> as the target.
|
|
|
|
+ * if <i>localMachine</i> is not available,
|
|
|
|
+ * choose a node on the same rack
|
|
|
|
+ * @return the chosen storage
|
|
|
|
+ */
|
|
|
|
+ protected DatanodeStorageInfo chooseLocalStorage(Node localMachine,
|
|
|
|
+ Set<Node> excludedNodes, long blocksize, int maxNodesPerRack,
|
|
|
|
+ List<DatanodeStorageInfo> results, boolean avoidStaleNodes,
|
|
|
|
+ EnumMap<StorageType, Integer> storageTypes, boolean fallbackToLocalRack)
|
|
|
|
+ throws NotEnoughReplicasException {
|
|
|
|
+ DatanodeStorageInfo localStorage = chooseLocalStorage(localMachine,
|
|
|
|
+ excludedNodes, blocksize, maxNodesPerRack, results,
|
|
|
|
+ avoidStaleNodes, storageTypes);
|
|
|
|
+ if (localStorage != null) {
|
|
|
|
+ return localStorage;
|
|
|
|
+ }
|
|
|
|
|
|
if (!fallbackToLocalRack) {
|
|
if (!fallbackToLocalRack) {
|
|
return null;
|
|
return null;
|
|
@@ -653,6 +669,14 @@ public class BlockPlacementPolicyDefault extends BlockPlacementPolicy {
|
|
builder.append("\nNode ").append(NodeBase.getPath(chosenNode)).append(" [");
|
|
builder.append("\nNode ").append(NodeBase.getPath(chosenNode)).append(" [");
|
|
}
|
|
}
|
|
numOfAvailableNodes--;
|
|
numOfAvailableNodes--;
|
|
|
|
+ if (!isGoodDatanode(chosenNode, maxNodesPerRack, considerLoad,
|
|
|
|
+ results, avoidStaleNodes)) {
|
|
|
|
+ if (LOG.isDebugEnabled()) {
|
|
|
|
+ builder.append("\n]");
|
|
|
|
+ }
|
|
|
|
+ badTarget = true;
|
|
|
|
+ continue;
|
|
|
|
+ }
|
|
|
|
|
|
final DatanodeStorageInfo[] storages = DFSUtil.shuffle(
|
|
final DatanodeStorageInfo[] storages = DFSUtil.shuffle(
|
|
chosenNode.getStorageInfos());
|
|
chosenNode.getStorageInfos());
|
|
@@ -664,8 +688,7 @@ public class BlockPlacementPolicyDefault extends BlockPlacementPolicy {
|
|
for (i = 0; i < storages.length; i++) {
|
|
for (i = 0; i < storages.length; i++) {
|
|
StorageType type = entry.getKey();
|
|
StorageType type = entry.getKey();
|
|
final int newExcludedNodes = addIfIsGoodTarget(storages[i],
|
|
final int newExcludedNodes = addIfIsGoodTarget(storages[i],
|
|
- excludedNodes, blocksize, maxNodesPerRack, considerLoad, results,
|
|
|
|
- avoidStaleNodes, type);
|
|
|
|
|
|
+ excludedNodes, blocksize, results, type);
|
|
if (newExcludedNodes >= 0) {
|
|
if (newExcludedNodes >= 0) {
|
|
numOfReplicas--;
|
|
numOfReplicas--;
|
|
if (firstChosen == null) {
|
|
if (firstChosen == null) {
|
|
@@ -725,13 +748,9 @@ public class BlockPlacementPolicyDefault extends BlockPlacementPolicy {
|
|
int addIfIsGoodTarget(DatanodeStorageInfo storage,
|
|
int addIfIsGoodTarget(DatanodeStorageInfo storage,
|
|
Set<Node> excludedNodes,
|
|
Set<Node> excludedNodes,
|
|
long blockSize,
|
|
long blockSize,
|
|
- int maxNodesPerRack,
|
|
|
|
- boolean considerLoad,
|
|
|
|
- List<DatanodeStorageInfo> results,
|
|
|
|
- boolean avoidStaleNodes,
|
|
|
|
|
|
+ List<DatanodeStorageInfo> results,
|
|
StorageType storageType) {
|
|
StorageType storageType) {
|
|
- if (isGoodTarget(storage, blockSize, maxNodesPerRack, considerLoad,
|
|
|
|
- results, avoidStaleNodes, storageType)) {
|
|
|
|
|
|
+ if (isGoodTarget(storage, blockSize, results, storageType)) {
|
|
results.add(storage);
|
|
results.add(storage);
|
|
// add node and related nodes to excludedNode
|
|
// add node and related nodes to excludedNode
|
|
return addToExcludedNodes(storage.getDatanodeDescriptor(), excludedNodes);
|
|
return addToExcludedNodes(storage.getDatanodeDescriptor(), excludedNodes);
|
|
@@ -749,75 +768,52 @@ public class BlockPlacementPolicyDefault extends BlockPlacementPolicy {
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
|
|
|
|
+ private static void logNodeIsNotChosen(DatanodeDescriptor node,
|
|
|
|
+ String reason) {
|
|
|
|
+ if (LOG.isDebugEnabled()) {
|
|
|
|
+ // build the error message for later use.
|
|
|
|
+ debugLoggingBuilder.get()
|
|
|
|
+ .append("\n Datanode ").append(node)
|
|
|
|
+ .append(" is not chosen since ").append(reason).append(".");
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+
|
|
/**
|
|
/**
|
|
- * Determine if a storage is a good target.
|
|
|
|
- *
|
|
|
|
- * @param storage The target storage
|
|
|
|
- * @param blockSize Size of block
|
|
|
|
- * @param maxTargetPerRack Maximum number of targets per rack. The value of
|
|
|
|
- * this parameter depends on the number of racks in
|
|
|
|
|
|
+ * Determine if a datanode is good for placing block.
|
|
|
|
+ *
|
|
|
|
+ * @param node The target datanode
|
|
|
|
+ * @param maxTargetPerRack Maximum number of targets per rack. The value of
|
|
|
|
+ * this parameter depends on the number of racks in
|
|
* the cluster and total number of replicas for a block
|
|
* the cluster and total number of replicas for a block
|
|
* @param considerLoad whether or not to consider load of the target node
|
|
* @param considerLoad whether or not to consider load of the target node
|
|
- * @param results A list containing currently chosen nodes. Used to check if
|
|
|
|
|
|
+ * @param results A list containing currently chosen nodes. Used to check if
|
|
* too many nodes has been chosen in the target rack.
|
|
* too many nodes has been chosen in the target rack.
|
|
* @param avoidStaleNodes Whether or not to avoid choosing stale nodes
|
|
* @param avoidStaleNodes Whether or not to avoid choosing stale nodes
|
|
- * @return Return true if <i>node</i> has enough space,
|
|
|
|
- * does not have too much load,
|
|
|
|
- * and the rack does not have too many nodes.
|
|
|
|
|
|
+ * @return Reture true if the datanode is good candidate, otherwise false
|
|
*/
|
|
*/
|
|
- private boolean isGoodTarget(DatanodeStorageInfo storage,
|
|
|
|
- long blockSize, int maxTargetPerRack,
|
|
|
|
- boolean considerLoad,
|
|
|
|
- List<DatanodeStorageInfo> results,
|
|
|
|
- boolean avoidStaleNodes,
|
|
|
|
- StorageType requiredStorageType) {
|
|
|
|
- if (storage.getStorageType() != requiredStorageType) {
|
|
|
|
- logNodeIsNotChosen(storage, "storage types do not match,"
|
|
|
|
- + " where the required storage type is " + requiredStorageType);
|
|
|
|
- return false;
|
|
|
|
- }
|
|
|
|
- if (storage.getState() == State.READ_ONLY_SHARED) {
|
|
|
|
- logNodeIsNotChosen(storage, "storage is read-only");
|
|
|
|
- return false;
|
|
|
|
- }
|
|
|
|
-
|
|
|
|
- if (storage.getState() == State.FAILED) {
|
|
|
|
- logNodeIsNotChosen(storage, "storage has failed");
|
|
|
|
- return false;
|
|
|
|
- }
|
|
|
|
-
|
|
|
|
- DatanodeDescriptor node = storage.getDatanodeDescriptor();
|
|
|
|
|
|
+ boolean isGoodDatanode(DatanodeDescriptor node,
|
|
|
|
+ int maxTargetPerRack, boolean considerLoad,
|
|
|
|
+ List<DatanodeStorageInfo> results,
|
|
|
|
+ boolean avoidStaleNodes) {
|
|
// check if the node is (being) decommissioned
|
|
// check if the node is (being) decommissioned
|
|
if (node.isDecommissionInProgress() || node.isDecommissioned()) {
|
|
if (node.isDecommissionInProgress() || node.isDecommissioned()) {
|
|
- logNodeIsNotChosen(storage, "the node is (being) decommissioned ");
|
|
|
|
|
|
+ logNodeIsNotChosen(node, "the node is (being) decommissioned ");
|
|
return false;
|
|
return false;
|
|
}
|
|
}
|
|
|
|
|
|
if (avoidStaleNodes) {
|
|
if (avoidStaleNodes) {
|
|
if (node.isStale(this.staleInterval)) {
|
|
if (node.isStale(this.staleInterval)) {
|
|
- logNodeIsNotChosen(storage, "the node is stale ");
|
|
|
|
|
|
+ logNodeIsNotChosen(node, "the node is stale ");
|
|
return false;
|
|
return false;
|
|
}
|
|
}
|
|
}
|
|
}
|
|
-
|
|
|
|
- final long requiredSize = blockSize * HdfsServerConstants.MIN_BLOCKS_FOR_WRITE;
|
|
|
|
- final long scheduledSize = blockSize * node.getBlocksScheduled(storage.getStorageType());
|
|
|
|
- final long remaining = node.getRemaining(storage.getStorageType());
|
|
|
|
- if (requiredSize > remaining - scheduledSize) {
|
|
|
|
- logNodeIsNotChosen(storage, "the node does not have enough "
|
|
|
|
- + storage.getStorageType() + " space"
|
|
|
|
- + " (required=" + requiredSize
|
|
|
|
- + ", scheduled=" + scheduledSize
|
|
|
|
- + ", remaining=" + remaining + ")");
|
|
|
|
- return false;
|
|
|
|
- }
|
|
|
|
|
|
|
|
// check the communication traffic of the target machine
|
|
// check the communication traffic of the target machine
|
|
if (considerLoad) {
|
|
if (considerLoad) {
|
|
final double maxLoad = 2.0 * stats.getInServiceXceiverAverage();
|
|
final double maxLoad = 2.0 * stats.getInServiceXceiverAverage();
|
|
final int nodeLoad = node.getXceiverCount();
|
|
final int nodeLoad = node.getXceiverCount();
|
|
if (nodeLoad > maxLoad) {
|
|
if (nodeLoad > maxLoad) {
|
|
- logNodeIsNotChosen(storage, "the node is too busy (load: " + nodeLoad
|
|
|
|
|
|
+ logNodeIsNotChosen(node, "the node is too busy (load: " + nodeLoad
|
|
+ " > " + maxLoad + ") ");
|
|
+ " > " + maxLoad + ") ");
|
|
return false;
|
|
return false;
|
|
}
|
|
}
|
|
@@ -832,10 +828,56 @@ public class BlockPlacementPolicyDefault extends BlockPlacementPolicy {
|
|
counter++;
|
|
counter++;
|
|
}
|
|
}
|
|
}
|
|
}
|
|
- if (counter>maxTargetPerRack) {
|
|
|
|
- logNodeIsNotChosen(storage, "the rack has too many chosen nodes ");
|
|
|
|
|
|
+ if (counter > maxTargetPerRack) {
|
|
|
|
+ logNodeIsNotChosen(node, "the rack has too many chosen nodes ");
|
|
|
|
+ return false;
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ return true;
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ /**
|
|
|
|
+ * Determine if a storage is a good target.
|
|
|
|
+ *
|
|
|
|
+ * @param storage The target storage
|
|
|
|
+ * @param blockSize Size of block
|
|
|
|
+ * @param results A list containing currently chosen nodes. Used to check if
|
|
|
|
+ * too many nodes has been chosen in the target rack.
|
|
|
|
+ * @return Return true if <i>node</i> has enough space.
|
|
|
|
+ */
|
|
|
|
+ private boolean isGoodTarget(DatanodeStorageInfo storage,
|
|
|
|
+ long blockSize,
|
|
|
|
+ List<DatanodeStorageInfo> results,
|
|
|
|
+ StorageType requiredStorageType) {
|
|
|
|
+ if (storage.getStorageType() != requiredStorageType) {
|
|
|
|
+ logNodeIsNotChosen(storage, "storage types do not match,"
|
|
|
|
+ + " where the required storage type is " + requiredStorageType);
|
|
|
|
+ return false;
|
|
|
|
+ }
|
|
|
|
+ if (storage.getState() == State.READ_ONLY_SHARED) {
|
|
|
|
+ logNodeIsNotChosen(storage, "storage is read-only");
|
|
|
|
+ return false;
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ if (storage.getState() == State.FAILED) {
|
|
|
|
+ logNodeIsNotChosen(storage, "storage has failed");
|
|
|
|
+ return false;
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ DatanodeDescriptor node = storage.getDatanodeDescriptor();
|
|
|
|
+
|
|
|
|
+ final long requiredSize = blockSize * HdfsServerConstants.MIN_BLOCKS_FOR_WRITE;
|
|
|
|
+ final long scheduledSize = blockSize * node.getBlocksScheduled(storage.getStorageType());
|
|
|
|
+ final long remaining = node.getRemaining(storage.getStorageType());
|
|
|
|
+ if (requiredSize > remaining - scheduledSize) {
|
|
|
|
+ logNodeIsNotChosen(storage, "the node does not have enough "
|
|
|
|
+ + storage.getStorageType() + " space"
|
|
|
|
+ + " (required=" + requiredSize
|
|
|
|
+ + ", scheduled=" + scheduledSize
|
|
|
|
+ + ", remaining=" + remaining + ")");
|
|
return false;
|
|
return false;
|
|
}
|
|
}
|
|
|
|
+
|
|
return true;
|
|
return true;
|
|
}
|
|
}
|
|
|
|
|