|
@@ -21,6 +21,7 @@ import static org.apache.hadoop.util.Time.now;
|
|
|
|
|
|
import java.util.ArrayList;
|
|
|
import java.util.Collection;
|
|
|
+import java.util.EnumSet;
|
|
|
import java.util.HashSet;
|
|
|
import java.util.List;
|
|
|
import java.util.Set;
|
|
@@ -218,7 +219,8 @@ public class BlockPlacementPolicyDefault extends BlockPlacementPolicy {
|
|
|
boolean avoidStaleNodes = (stats != null
|
|
|
&& stats.isAvoidingStaleDataNodesForWrite());
|
|
|
final Node localNode = chooseTarget(numOfReplicas, writer, excludedNodes,
|
|
|
- blocksize, maxNodesPerRack, results, avoidStaleNodes, storagePolicy);
|
|
|
+ blocksize, maxNodesPerRack, results, avoidStaleNodes, storagePolicy,
|
|
|
+ EnumSet.noneOf(StorageType.class), results.isEmpty());
|
|
|
if (!returnChosenNodes) {
|
|
|
results.removeAll(chosenStorage);
|
|
|
}
|
|
@@ -238,7 +240,40 @@ public class BlockPlacementPolicyDefault extends BlockPlacementPolicy {
|
|
|
int maxNodesPerRack = (totalNumOfReplicas-1)/clusterMap.getNumOfRacks()+2;
|
|
|
return new int[] {numOfReplicas, maxNodesPerRack};
|
|
|
}
|
|
|
-
|
|
|
+
|
|
|
+ private static List<StorageType> selectStorageTypes(
|
|
|
+ final BlockStoragePolicy storagePolicy,
|
|
|
+ final short replication,
|
|
|
+ final Iterable<StorageType> chosen,
|
|
|
+ final EnumSet<StorageType> unavailableStorages,
|
|
|
+ final boolean isNewBlock) {
|
|
|
+ final List<StorageType> storageTypes = storagePolicy.chooseStorageTypes(
|
|
|
+ replication, chosen);
|
|
|
+ final List<StorageType> removed = new ArrayList<StorageType>();
|
|
|
+ for(int i = storageTypes.size() - 1; i >= 0; i--) {
|
|
|
+ // replace/remove unavailable storage types.
|
|
|
+ final StorageType t = storageTypes.get(i);
|
|
|
+ if (unavailableStorages.contains(t)) {
|
|
|
+ final StorageType fallback = isNewBlock?
|
|
|
+ storagePolicy.getCreationFallback(unavailableStorages)
|
|
|
+ : storagePolicy.getReplicationFallback(unavailableStorages);
|
|
|
+ if (fallback == null) {
|
|
|
+ removed.add(storageTypes.remove(i));
|
|
|
+ } else {
|
|
|
+ storageTypes.set(i, fallback);
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+ if (storageTypes.size() < replication) {
|
|
|
+ LOG.warn("Failed to place enough replicas: replication is " + replication
|
|
|
+ + " but only " + storageTypes.size() + " storage types can be selected "
|
|
|
+ + "(selected=" + storageTypes
|
|
|
+ + ", unavailable=" + unavailableStorages
|
|
|
+ + ", removed=" + removed
|
|
|
+ + ", policy=" + storagePolicy + ")");
|
|
|
+ }
|
|
|
+ return storageTypes;
|
|
|
+ }
|
|
|
/**
|
|
|
* choose <i>numOfReplicas</i> from all data nodes
|
|
|
* @param numOfReplicas additional number of replicas wanted
|
|
@@ -257,14 +292,14 @@ public class BlockPlacementPolicyDefault extends BlockPlacementPolicy {
|
|
|
final int maxNodesPerRack,
|
|
|
final List<DatanodeStorageInfo> results,
|
|
|
final boolean avoidStaleNodes,
|
|
|
- final BlockStoragePolicy storagePolicy) {
|
|
|
+ final BlockStoragePolicy storagePolicy,
|
|
|
+ final EnumSet<StorageType> unavailableStorages,
|
|
|
+ final boolean newBlock) {
|
|
|
if (numOfReplicas == 0 || clusterMap.getNumOfLeaves()==0) {
|
|
|
return writer;
|
|
|
}
|
|
|
- int totalReplicasExpected = numOfReplicas + results.size();
|
|
|
-
|
|
|
- int numOfResults = results.size();
|
|
|
- boolean newBlock = (numOfResults==0);
|
|
|
+ final int numOfResults = results.size();
|
|
|
+ final int totalReplicasExpected = numOfReplicas + numOfResults;
|
|
|
if ((writer == null || !(writer instanceof DatanodeDescriptor)) && !newBlock) {
|
|
|
writer = results.get(0).getDatanodeDescriptor();
|
|
|
}
|
|
@@ -272,12 +307,25 @@ public class BlockPlacementPolicyDefault extends BlockPlacementPolicy {
|
|
|
// Keep a copy of original excludedNodes
|
|
|
final Set<Node> oldExcludedNodes = avoidStaleNodes ?
|
|
|
new HashSet<Node>(excludedNodes) : null;
|
|
|
- final List<StorageType> storageTypes = storagePolicy.chooseStorageTypes(
|
|
|
- (short)totalReplicasExpected, DatanodeStorageInfo.toStorageTypes(results));
|
|
|
+
|
|
|
+ // choose storage types; use fallbacks for unavailable storages
|
|
|
+ final List<StorageType> storageTypes = selectStorageTypes(storagePolicy,
|
|
|
+ (short)totalReplicasExpected, DatanodeStorageInfo.toStorageTypes(results),
|
|
|
+ unavailableStorages, newBlock);
|
|
|
+
|
|
|
+ StorageType curStorageType = null;
|
|
|
try {
|
|
|
+ if ((numOfReplicas = storageTypes.size()) == 0) {
|
|
|
+ throw new NotEnoughReplicasException(
|
|
|
+ "All required storage types are unavailable: "
|
|
|
+ + " unavailableStorages=" + unavailableStorages
|
|
|
+ + ", storagePolicy=" + storagePolicy);
|
|
|
+ }
|
|
|
+
|
|
|
if (numOfResults == 0) {
|
|
|
+ curStorageType = storageTypes.remove(0);
|
|
|
writer = chooseLocalStorage(writer, excludedNodes, blocksize,
|
|
|
- maxNodesPerRack, results, avoidStaleNodes, storageTypes.remove(0), true)
|
|
|
+ maxNodesPerRack, results, avoidStaleNodes, curStorageType, true)
|
|
|
.getDatanodeDescriptor();
|
|
|
if (--numOfReplicas == 0) {
|
|
|
return writer;
|
|
@@ -285,30 +333,33 @@ public class BlockPlacementPolicyDefault extends BlockPlacementPolicy {
|
|
|
}
|
|
|
final DatanodeDescriptor dn0 = results.get(0).getDatanodeDescriptor();
|
|
|
if (numOfResults <= 1) {
|
|
|
+ curStorageType = storageTypes.remove(0);
|
|
|
chooseRemoteRack(1, dn0, excludedNodes, blocksize, maxNodesPerRack,
|
|
|
- results, avoidStaleNodes, storageTypes.remove(0));
|
|
|
+ results, avoidStaleNodes, curStorageType);
|
|
|
if (--numOfReplicas == 0) {
|
|
|
return writer;
|
|
|
}
|
|
|
}
|
|
|
if (numOfResults <= 2) {
|
|
|
final DatanodeDescriptor dn1 = results.get(1).getDatanodeDescriptor();
|
|
|
+ curStorageType = storageTypes.remove(0);
|
|
|
if (clusterMap.isOnSameRack(dn0, dn1)) {
|
|
|
chooseRemoteRack(1, dn0, excludedNodes, blocksize, maxNodesPerRack,
|
|
|
- results, avoidStaleNodes, storageTypes.remove(0));
|
|
|
+ results, avoidStaleNodes, curStorageType);
|
|
|
} else if (newBlock){
|
|
|
chooseLocalRack(dn1, excludedNodes, blocksize, maxNodesPerRack,
|
|
|
- results, avoidStaleNodes, storageTypes.remove(0));
|
|
|
+ results, avoidStaleNodes, curStorageType);
|
|
|
} else {
|
|
|
chooseLocalRack(writer, excludedNodes, blocksize, maxNodesPerRack,
|
|
|
- results, avoidStaleNodes, storageTypes.remove(0));
|
|
|
+ results, avoidStaleNodes, curStorageType);
|
|
|
}
|
|
|
if (--numOfReplicas == 0) {
|
|
|
return writer;
|
|
|
}
|
|
|
}
|
|
|
+ curStorageType = storageTypes.remove(0);
|
|
|
chooseRandom(numOfReplicas, NodeBase.ROOT, excludedNodes, blocksize,
|
|
|
- maxNodesPerRack, results, avoidStaleNodes, storageTypes.remove(0));
|
|
|
+ maxNodesPerRack, results, avoidStaleNodes, curStorageType);
|
|
|
} catch (NotEnoughReplicasException e) {
|
|
|
final String message = "Failed to place enough replicas, still in need of "
|
|
|
+ (totalReplicasExpected - results.size()) + " to reach "
|
|
@@ -333,7 +384,16 @@ public class BlockPlacementPolicyDefault extends BlockPlacementPolicy {
|
|
|
// if the NotEnoughReplicasException was thrown in chooseRandom().
|
|
|
numOfReplicas = totalReplicasExpected - results.size();
|
|
|
return chooseTarget(numOfReplicas, writer, oldExcludedNodes, blocksize,
|
|
|
- maxNodesPerRack, results, false, storagePolicy);
|
|
|
+ maxNodesPerRack, results, false, storagePolicy, unavailableStorages,
|
|
|
+ newBlock);
|
|
|
+ }
|
|
|
+
|
|
|
+ if (storageTypes.size() > 0) {
|
|
|
+ // Retry chooseTarget with fallback storage types
|
|
|
+ unavailableStorages.add(curStorageType);
|
|
|
+ return chooseTarget(numOfReplicas, writer, excludedNodes, blocksize,
|
|
|
+ maxNodesPerRack, results, false, storagePolicy, unavailableStorages,
|
|
|
+ newBlock);
|
|
|
}
|
|
|
}
|
|
|
return writer;
|