|
@@ -29,6 +29,7 @@ import org.apache.hadoop.ipc.Server;
|
|
|
|
|
|
import java.io.*;
|
|
|
import java.util.*;
|
|
|
+import java.util.Map.Entry;
|
|
|
|
|
|
/***************************************************
|
|
|
* FSNamesystem does the actual bookkeeping work for the
|
|
@@ -2179,18 +2180,57 @@ class FSNamesystem implements FSConstants {
|
|
|
*
|
|
|
* srcNodes.size() - dstNodes.size() == replication
|
|
|
*
|
|
|
- * We pick node with least free space
|
|
|
- * In the future, we might enforce some kind of policy
|
|
|
- * (like making sure replicates are spread across racks).
|
|
|
+ * We pick node that make sure that replicas are spread across racks and
|
|
|
+ * also try hard to pick one with least free space.
|
|
|
+ * The algorithm is first to pick a node with least free space from nodes
|
|
|
+ * that are on a rack holding more than one replicas of the block.
|
|
|
+ * So removing such a replica won't remove a rack.
|
|
|
+ * If no such a node is available,
|
|
|
+ * then pick a node with least free space
|
|
|
*/
|
|
|
void chooseExcessReplicates(Collection<DatanodeDescriptor> nonExcess,
|
|
|
Block b, short replication) {
|
|
|
+ // first form a rack to datanodes map and
|
|
|
+ HashMap<String, ArrayList<DatanodeDescriptor>> rackMap =
|
|
|
+ new HashMap<String, ArrayList<DatanodeDescriptor>>();
|
|
|
+ for (Iterator<DatanodeDescriptor> iter = nonExcess.iterator();
|
|
|
+ iter.hasNext();) {
|
|
|
+ DatanodeDescriptor node = iter.next();
|
|
|
+ String rackName = node.getNetworkLocation();
|
|
|
+ ArrayList<DatanodeDescriptor> datanodeList = rackMap.get(rackName);
|
|
|
+ if(datanodeList==null) {
|
|
|
+ datanodeList = new ArrayList<DatanodeDescriptor>();
|
|
|
+ }
|
|
|
+ datanodeList.add(node);
|
|
|
+ rackMap.put(rackName, datanodeList);
|
|
|
+ }
|
|
|
+
|
|
|
+ // split nodes into two sets
|
|
|
+ // priSet contains nodes on rack with more than one replica
|
|
|
+ // remains contains the remaining nodes
|
|
|
+ ArrayList<DatanodeDescriptor> priSet = new ArrayList<DatanodeDescriptor>();
|
|
|
+ ArrayList<DatanodeDescriptor> remains = new ArrayList<DatanodeDescriptor>();
|
|
|
+ for( Iterator<Entry<String, ArrayList<DatanodeDescriptor>>> iter =
|
|
|
+ rackMap.entrySet().iterator(); iter.hasNext(); ) {
|
|
|
+ Entry<String, ArrayList<DatanodeDescriptor>> rackEntry = iter.next();
|
|
|
+ ArrayList<DatanodeDescriptor> datanodeList = rackEntry.getValue();
|
|
|
+ if( datanodeList.size() == 1 ) {
|
|
|
+ remains.add(datanodeList.get(0));
|
|
|
+ } else {
|
|
|
+ priSet.addAll(datanodeList);
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ // pick one node with least space from priSet if it is not empty
|
|
|
+ // otherwise one node with least space from remains
|
|
|
while (nonExcess.size() - replication > 0) {
|
|
|
DatanodeInfo cur = null;
|
|
|
long minSpace = Long.MAX_VALUE;
|
|
|
-
|
|
|
- for (Iterator<DatanodeDescriptor> iter = nonExcess.iterator(); iter.hasNext();) {
|
|
|
- DatanodeInfo node = iter.next();
|
|
|
+
|
|
|
+ Iterator<DatanodeDescriptor> iter =
|
|
|
+ priSet.isEmpty() ? remains.iterator() : priSet.iterator();
|
|
|
+ while( iter.hasNext() ) {
|
|
|
+ DatanodeDescriptor node = iter.next();
|
|
|
long free = node.getRemaining();
|
|
|
|
|
|
if (minSpace > free) {
|
|
@@ -2198,7 +2238,24 @@ class FSNamesystem implements FSConstants {
|
|
|
cur = node;
|
|
|
}
|
|
|
}
|
|
|
-
|
|
|
+
|
|
|
+ // adjust rackmap, priSet, and remains
|
|
|
+ String rack = cur.getNetworkLocation();
|
|
|
+ ArrayList<DatanodeDescriptor> datanodes = rackMap.get(rack);
|
|
|
+ datanodes.remove(cur);
|
|
|
+ if(datanodes.isEmpty()) {
|
|
|
+ rackMap.remove(rack);
|
|
|
+ }
|
|
|
+ if (priSet.isEmpty()) {
|
|
|
+ remains.remove(cur);
|
|
|
+ } else {
|
|
|
+ priSet.remove(cur);
|
|
|
+ if (datanodes.size() == 1) {
|
|
|
+ priSet.remove(datanodes.get(0));
|
|
|
+ remains.add(datanodes.get(0));
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
nonExcess.remove(cur);
|
|
|
|
|
|
Collection<Block> excessBlocks = excessReplicateMap.get(cur.getStorageID());
|