|
@@ -22,7 +22,10 @@ import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_AVAILABLE_SPACE_
|
|
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_AVAILABLE_SPACE_BLOCK_PLACEMENT_POLICY_BALANCED_SPACE_PREFERENCE_FRACTION_DEFAULT;
|
|
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_AVAILABLE_SPACE_BLOCK_PLACEMENT_POLICY_BALANCED_SPACE_PREFERENCE_FRACTION_DEFAULT;
|
|
|
|
|
|
import java.util.Collection;
|
|
import java.util.Collection;
|
|
|
|
+import java.util.EnumMap;
|
|
|
|
+import java.util.List;
|
|
import java.util.Random;
|
|
import java.util.Random;
|
|
|
|
+import java.util.Set;
|
|
|
|
|
|
import com.google.common.base.Preconditions;
|
|
import com.google.common.base.Preconditions;
|
|
import org.slf4j.Logger;
|
|
import org.slf4j.Logger;
|
|
@@ -44,6 +47,7 @@ public class AvailableSpaceBlockPlacementPolicy extends
|
|
private static final Random RAND = new Random();
|
|
private static final Random RAND = new Random();
|
|
private int balancedPreference =
|
|
private int balancedPreference =
|
|
(int) (100 * DFS_NAMENODE_AVAILABLE_SPACE_BLOCK_PLACEMENT_POLICY_BALANCED_SPACE_PREFERENCE_FRACTION_DEFAULT);
|
|
(int) (100 * DFS_NAMENODE_AVAILABLE_SPACE_BLOCK_PLACEMENT_POLICY_BALANCED_SPACE_PREFERENCE_FRACTION_DEFAULT);
|
|
|
|
+ private boolean optimizeLocal;
|
|
|
|
|
|
@Override
|
|
@Override
|
|
public void initialize(Configuration conf, FSClusterStats stats,
|
|
public void initialize(Configuration conf, FSClusterStats stats,
|
|
@@ -58,6 +62,10 @@ public class AvailableSpaceBlockPlacementPolicy extends
|
|
+ DFSConfigKeys.DFS_NAMENODE_AVAILABLE_SPACE_BLOCK_PLACEMENT_POLICY_BALANCED_SPACE_PREFERENCE_FRACTION_KEY
|
|
+ DFSConfigKeys.DFS_NAMENODE_AVAILABLE_SPACE_BLOCK_PLACEMENT_POLICY_BALANCED_SPACE_PREFERENCE_FRACTION_KEY
|
|
+ " = " + balancedPreferencePercent);
|
|
+ " = " + balancedPreferencePercent);
|
|
|
|
|
|
|
|
+ optimizeLocal = conf.getBoolean(
|
|
|
|
+ DFSConfigKeys.DFS_NAMENODE_AVAILABLE_SPACE_BLOCK_PLACEMENT_POLICY_BALANCE_LOCAL_NODE_KEY,
|
|
|
|
+ DFSConfigKeys.DFS_NAMENODE_AVAILABLE_SPACE_BLOCK_PLACEMENT_POLICY_BALANCE_LOCAL_NODE_DEFAULT);
|
|
|
|
+
|
|
if (balancedPreferencePercent > 1.0) {
|
|
if (balancedPreferencePercent > 1.0) {
|
|
LOG.warn("The value of "
|
|
LOG.warn("The value of "
|
|
+ DFS_NAMENODE_AVAILABLE_SPACE_BLOCK_PLACEMENT_POLICY_BALANCED_SPACE_PREFERENCE_FRACTION_KEY
|
|
+ DFS_NAMENODE_AVAILABLE_SPACE_BLOCK_PLACEMENT_POLICY_BALANCED_SPACE_PREFERENCE_FRACTION_KEY
|
|
@@ -82,7 +90,65 @@ public class AvailableSpaceBlockPlacementPolicy extends
|
|
.chooseRandomWithStorageType(scope, excludedNode, type);
|
|
.chooseRandomWithStorageType(scope, excludedNode, type);
|
|
DatanodeDescriptor b = (DatanodeDescriptor) dfsClusterMap
|
|
DatanodeDescriptor b = (DatanodeDescriptor) dfsClusterMap
|
|
.chooseRandomWithStorageType(scope, excludedNode, type);
|
|
.chooseRandomWithStorageType(scope, excludedNode, type);
|
|
- return select(a, b);
|
|
|
|
|
|
+ return select(a, b, false);
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ @Override
|
|
|
|
+ protected DatanodeStorageInfo chooseLocalStorage(Node localMachine,
|
|
|
|
+ Set<Node> excludedNodes, long blocksize, int maxNodesPerRack,
|
|
|
|
+ List<DatanodeStorageInfo> results, boolean avoidStaleNodes,
|
|
|
|
+ EnumMap<StorageType, Integer> storageTypes, boolean fallbackToLocalRack)
|
|
|
|
+ throws NotEnoughReplicasException {
|
|
|
|
+ if (!optimizeLocal) {
|
|
|
|
+ return super.chooseLocalStorage(localMachine, excludedNodes, blocksize,
|
|
|
|
+ maxNodesPerRack, results, avoidStaleNodes, storageTypes,
|
|
|
|
+ fallbackToLocalRack);
|
|
|
|
+ }
|
|
|
|
+ final EnumMap<StorageType, Integer> initialStorageTypesLocal =
|
|
|
|
+ storageTypes.clone();
|
|
|
|
+ final EnumMap<StorageType, Integer> initialStorageTypesLocalRack =
|
|
|
|
+ storageTypes.clone();
|
|
|
|
+ DatanodeStorageInfo local =
|
|
|
|
+ chooseLocalStorage(localMachine, excludedNodes, blocksize,
|
|
|
|
+ maxNodesPerRack, results, avoidStaleNodes,
|
|
|
|
+ initialStorageTypesLocal);
|
|
|
|
+ if (!fallbackToLocalRack) {
|
|
|
|
+ return local;
|
|
|
|
+ }
|
|
|
|
+ if (local != null) {
|
|
|
|
+ results.remove(local);
|
|
|
|
+ }
|
|
|
|
+ DatanodeStorageInfo localRack =
|
|
|
|
+ chooseLocalRack(localMachine, excludedNodes, blocksize, maxNodesPerRack,
|
|
|
|
+ results, avoidStaleNodes, initialStorageTypesLocalRack);
|
|
|
|
+ if (local != null && localRack != null) {
|
|
|
|
+ if (select(local.getDatanodeDescriptor(),
|
|
|
|
+ localRack.getDatanodeDescriptor(), true) == local
|
|
|
|
+ .getDatanodeDescriptor()) {
|
|
|
|
+ results.remove(localRack);
|
|
|
|
+ results.add(local);
|
|
|
|
+ swapStorageTypes(initialStorageTypesLocal, storageTypes);
|
|
|
|
+ excludedNodes.remove(localRack.getDatanodeDescriptor());
|
|
|
|
+ return local;
|
|
|
|
+ } else {
|
|
|
|
+ swapStorageTypes(initialStorageTypesLocalRack, storageTypes);
|
|
|
|
+ excludedNodes.remove(local.getDatanodeDescriptor());
|
|
|
|
+ return localRack;
|
|
|
|
+ }
|
|
|
|
+ } else if (localRack == null && local != null) {
|
|
|
|
+ results.add(local);
|
|
|
|
+ swapStorageTypes(initialStorageTypesLocal, storageTypes);
|
|
|
|
+ return local;
|
|
|
|
+ } else {
|
|
|
|
+ swapStorageTypes(initialStorageTypesLocalRack, storageTypes);
|
|
|
|
+ return localRack;
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ private void swapStorageTypes(EnumMap<StorageType, Integer> fromStorageTypes,
|
|
|
|
+ EnumMap<StorageType, Integer> toStorageTypes) {
|
|
|
|
+ toStorageTypes.clear();
|
|
|
|
+ toStorageTypes.putAll(fromStorageTypes);
|
|
}
|
|
}
|
|
|
|
|
|
@Override
|
|
@Override
|
|
@@ -92,13 +158,13 @@ public class AvailableSpaceBlockPlacementPolicy extends
|
|
(DatanodeDescriptor) clusterMap.chooseRandom(scope, excludedNode);
|
|
(DatanodeDescriptor) clusterMap.chooseRandom(scope, excludedNode);
|
|
DatanodeDescriptor b =
|
|
DatanodeDescriptor b =
|
|
(DatanodeDescriptor) clusterMap.chooseRandom(scope, excludedNode);
|
|
(DatanodeDescriptor) clusterMap.chooseRandom(scope, excludedNode);
|
|
- return select(a, b);
|
|
|
|
|
|
+ return select(a, b, false);
|
|
}
|
|
}
|
|
|
|
|
|
- private DatanodeDescriptor select(
|
|
|
|
- DatanodeDescriptor a, DatanodeDescriptor b) {
|
|
|
|
|
|
+ private DatanodeDescriptor select(DatanodeDescriptor a, DatanodeDescriptor b,
|
|
|
|
+ boolean isBalanceLocal) {
|
|
if (a != null && b != null){
|
|
if (a != null && b != null){
|
|
- int ret = compareDataNode(a, b);
|
|
|
|
|
|
+ int ret = compareDataNode(a, b, isBalanceLocal);
|
|
if (ret == 0) {
|
|
if (ret == 0) {
|
|
return a;
|
|
return a;
|
|
} else if (ret < 0) {
|
|
} else if (ret < 0) {
|
|
@@ -115,9 +181,10 @@ public class AvailableSpaceBlockPlacementPolicy extends
|
|
* Compare the two data nodes.
|
|
* Compare the two data nodes.
|
|
*/
|
|
*/
|
|
protected int compareDataNode(final DatanodeDescriptor a,
|
|
protected int compareDataNode(final DatanodeDescriptor a,
|
|
- final DatanodeDescriptor b) {
|
|
|
|
|
|
+ final DatanodeDescriptor b, boolean isBalanceLocal) {
|
|
if (a.equals(b)
|
|
if (a.equals(b)
|
|
- || Math.abs(a.getDfsUsedPercent() - b.getDfsUsedPercent()) < 5) {
|
|
|
|
|
|
+ || Math.abs(a.getDfsUsedPercent() - b.getDfsUsedPercent()) < 5 || ((
|
|
|
|
+ isBalanceLocal && a.getDfsUsedPercent() < 50))) {
|
|
return 0;
|
|
return 0;
|
|
}
|
|
}
|
|
return a.getDfsUsedPercent() < b.getDfsUsedPercent() ? -1 : 1;
|
|
return a.getDfsUsedPercent() < b.getDfsUsedPercent() ? -1 : 1;
|