|
@@ -105,6 +105,9 @@ public class BlockManager {
|
|
|
// Default number of replicas
|
|
|
int defaultReplication;
|
|
|
|
|
|
+ // variable to enable check for enough racks
|
|
|
+ boolean shouldCheckForEnoughRacks = true;
|
|
|
+
|
|
|
/**
|
|
|
* Last block index used for replication work.
|
|
|
*/
|
|
@@ -155,10 +158,13 @@ public class BlockManager {
|
|
|
+ " must be less than dfs.replication.max = "
|
|
|
+ maxReplication);
|
|
|
this.maxReplicationStreams = conf.getInt("dfs.max-repl-streams", 2);
|
|
|
+ this.shouldCheckForEnoughRacks = conf.get("topology.script.file.name") == null ? false
|
|
|
+ : true;
|
|
|
FSNamesystem.LOG.info("defaultReplication = " + defaultReplication);
|
|
|
FSNamesystem.LOG.info("maxReplication = " + maxReplication);
|
|
|
FSNamesystem.LOG.info("minReplication = " + minReplication);
|
|
|
FSNamesystem.LOG.info("maxReplicationStreams = " + maxReplicationStreams);
|
|
|
+ FSNamesystem.LOG.info("shouldCheckForEnoughRacks = " + shouldCheckForEnoughRacks);
|
|
|
}
|
|
|
|
|
|
void activate() {
|
|
@@ -677,6 +683,7 @@ public class BlockManager {
|
|
|
int requiredReplication, numEffectiveReplicas;
|
|
|
List<DatanodeDescriptor> containingNodes;
|
|
|
DatanodeDescriptor srcNode;
|
|
|
+ int additionalReplRequired;
|
|
|
|
|
|
synchronized (namesystem) {
|
|
|
synchronized (neededReplications) {
|
|
@@ -688,6 +695,7 @@ public class BlockManager {
|
|
|
replIndex--;
|
|
|
return false;
|
|
|
}
|
|
|
+
|
|
|
requiredReplication = fileINode.getReplication();
|
|
|
|
|
|
// get a source data-node
|
|
@@ -704,21 +712,32 @@ public class BlockManager {
|
|
|
// do not schedule more if enough replicas is already pending
|
|
|
numEffectiveReplicas = numReplicas.liveReplicas() +
|
|
|
pendingReplications.getNumReplicas(block);
|
|
|
- if(numEffectiveReplicas >= requiredReplication) {
|
|
|
- neededReplications.remove(block, priority); // remove from neededReplications
|
|
|
- replIndex--;
|
|
|
- NameNode.stateChangeLog.info("BLOCK* "
|
|
|
- + "Removing block " + block
|
|
|
- + " from neededReplications as it has enough replicas.");
|
|
|
- return false;
|
|
|
+
|
|
|
+ if (numEffectiveReplicas >= requiredReplication) {
|
|
|
+ if ( (pendingReplications.getNumReplicas(block) > 0) ||
|
|
|
+ (blockHasEnoughRacks(block)) ) {
|
|
|
+ neededReplications.remove(block, priority); // remove from neededReplications
|
|
|
+ replIndex--;
|
|
|
+ NameNode.stateChangeLog.info("BLOCK* "
|
|
|
+ + "Removing block " + block
|
|
|
+ + " from neededReplications as it has enough replicas.");
|
|
|
+ return false;
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ if (numReplicas.liveReplicas() < requiredReplication) {
|
|
|
+ additionalReplRequired = requiredReplication - numEffectiveReplicas;
|
|
|
+ } else {
|
|
|
+ additionalReplRequired = 1; //Needed on a new rack
|
|
|
}
|
|
|
+
|
|
|
}
|
|
|
}
|
|
|
|
|
|
// choose replication targets: NOT HOLDING THE GLOBAL LOCK
|
|
|
- DatanodeDescriptor targets[] = replicator.chooseTarget(
|
|
|
- requiredReplication - numEffectiveReplicas,
|
|
|
- srcNode, containingNodes, null, block.getNumBytes());
|
|
|
+ DatanodeDescriptor targets[] =
|
|
|
+ replicator.chooseTarget(additionalReplRequired,
|
|
|
+ srcNode, containingNodes, null, block.getNumBytes());
|
|
|
if(targets.length == 0)
|
|
|
return false;
|
|
|
|
|
@@ -739,13 +758,25 @@ public class BlockManager {
|
|
|
NumberReplicas numReplicas = countNodes(block);
|
|
|
numEffectiveReplicas = numReplicas.liveReplicas() +
|
|
|
pendingReplications.getNumReplicas(block);
|
|
|
- if(numEffectiveReplicas >= requiredReplication) {
|
|
|
- neededReplications.remove(block, priority); // remove from neededReplications
|
|
|
- replIndex--;
|
|
|
- NameNode.stateChangeLog.info("BLOCK* "
|
|
|
- + "Removing block " + block
|
|
|
- + " from neededReplications as it has enough replicas.");
|
|
|
- return false;
|
|
|
+
|
|
|
+ if (numEffectiveReplicas >= requiredReplication) {
|
|
|
+ if ( (pendingReplications.getNumReplicas(block) > 0) ||
|
|
|
+ (blockHasEnoughRacks(block)) ) {
|
|
|
+ neededReplications.remove(block, priority); // remove from neededReplications
|
|
|
+ replIndex--;
|
|
|
+ NameNode.stateChangeLog.info("BLOCK* "
|
|
|
+ + "Removing block " + block
|
|
|
+ + " from neededReplications as it has enough replicas.");
|
|
|
+ return false;
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ if ( (numReplicas.liveReplicas() >= requiredReplication) &&
|
|
|
+ (!blockHasEnoughRacks(block)) ) {
|
|
|
+ if (srcNode.getNetworkLocation().equals(targets[0].getNetworkLocation())) {
|
|
|
+ //No use continuing, unless a new rack in this case
|
|
|
+ return false;
|
|
|
+ }
|
|
|
}
|
|
|
|
|
|
// Add block to the to be replicated list
|
|
@@ -867,10 +898,13 @@ public class BlockManager {
|
|
|
synchronized (namesystem) {
|
|
|
for (int i = 0; i < timedOutItems.length; i++) {
|
|
|
NumberReplicas num = countNodes(timedOutItems[i]);
|
|
|
- neededReplications.add(timedOutItems[i],
|
|
|
- num.liveReplicas(),
|
|
|
- num.decommissionedReplicas(),
|
|
|
- getReplication(timedOutItems[i]));
|
|
|
+ if (isNeededReplication(timedOutItems[i], getReplication(timedOutItems[i]),
|
|
|
+ num.liveReplicas())) {
|
|
|
+ neededReplications.add(timedOutItems[i],
|
|
|
+ num.liveReplicas(),
|
|
|
+ num.decommissionedReplicas(),
|
|
|
+ getReplication(timedOutItems[i]));
|
|
|
+ }
|
|
|
}
|
|
|
}
|
|
|
/* If we know the target datanodes where the replication timedout,
|
|
@@ -1122,9 +1156,11 @@ public class BlockManager {
|
|
|
NumberReplicas num = countNodes(block);
|
|
|
int numCurrentReplica = num.liveReplicas();
|
|
|
// add to under-replicated queue if need to be
|
|
|
- if (neededReplications.add(block, numCurrentReplica, num
|
|
|
- .decommissionedReplicas(), expectedReplication)) {
|
|
|
- nrUnderReplicated++;
|
|
|
+ if (isNeededReplication(block, expectedReplication, numCurrentReplica)) {
|
|
|
+ if (neededReplications.add(block, numCurrentReplica, num
|
|
|
+ .decommissionedReplicas(), expectedReplication)) {
|
|
|
+ nrUnderReplicated++;
|
|
|
+ }
|
|
|
}
|
|
|
|
|
|
if (numCurrentReplica > expectedReplication) {
|
|
@@ -1303,8 +1339,11 @@ public class BlockManager {
|
|
|
NumberReplicas num = countNodes(block);
|
|
|
int curReplicas = num.liveReplicas();
|
|
|
int curExpectedReplicas = getReplication(block);
|
|
|
- if (curExpectedReplicas > curReplicas) {
|
|
|
- status = true;
|
|
|
+ if (isNeededReplication(block, curExpectedReplicas, curReplicas)) {
|
|
|
+ if (curExpectedReplicas > curReplicas) {
|
|
|
+ //Set to true only if strictly under-replicated
|
|
|
+ status = true;
|
|
|
+ }
|
|
|
if (!neededReplications.contains(block) &&
|
|
|
pendingReplications.getNumReplicas(block) == 0) {
|
|
|
//
|
|
@@ -1357,16 +1396,23 @@ public class BlockManager {
|
|
|
synchronized (namesystem) {
|
|
|
NumberReplicas repl = countNodes(block);
|
|
|
int curExpectedReplicas = getReplication(block);
|
|
|
- neededReplications.update(block, repl.liveReplicas(), repl
|
|
|
- .decommissionedReplicas(), curExpectedReplicas, curReplicasDelta,
|
|
|
- expectedReplicasDelta);
|
|
|
+ if (isNeededReplication(block, curExpectedReplicas, repl.liveReplicas())) {
|
|
|
+ neededReplications.update(block, repl.liveReplicas(), repl
|
|
|
+ .decommissionedReplicas(), curExpectedReplicas, curReplicasDelta,
|
|
|
+ expectedReplicasDelta);
|
|
|
+ } else {
|
|
|
+ int oldReplicas = repl.liveReplicas()-curReplicasDelta;
|
|
|
+ int oldExpectedReplicas = curExpectedReplicas-expectedReplicasDelta;
|
|
|
+ neededReplications.remove(block, oldReplicas, repl.decommissionedReplicas(),
|
|
|
+ oldExpectedReplicas);
|
|
|
+ }
|
|
|
}
|
|
|
}
|
|
|
|
|
|
void checkReplication(Block block, int numExpectedReplicas) {
|
|
|
// filter out containingNodes that are marked for decommission.
|
|
|
NumberReplicas number = countNodes(block);
|
|
|
- if (number.liveReplicas() < numExpectedReplicas) {
|
|
|
+ if (isNeededReplication(block, numExpectedReplicas, number.liveReplicas())) {
|
|
|
neededReplications.add(block,
|
|
|
number.liveReplicas(),
|
|
|
number.decommissionedReplicas,
|
|
@@ -1448,7 +1494,68 @@ public class BlockManager {
|
|
|
return blocksToInvalidate.size();
|
|
|
}
|
|
|
}
|
|
|
+
|
|
|
+ //Returns the number of racks over which a given block is replicated
|
|
|
+ //decommissioning/decommissioned nodes are not counted. corrupt replicas
|
|
|
+ //are also ignored
|
|
|
+ int getNumberOfRacks(Block b) {
|
|
|
+ HashSet<String> rackSet = new HashSet<String>(0);
|
|
|
+ Collection<DatanodeDescriptor> corruptNodes =
|
|
|
+ corruptReplicas.getNodes(b);
|
|
|
+ for (Iterator<DatanodeDescriptor> it = blocksMap.nodeIterator(b);
|
|
|
+ it.hasNext();) {
|
|
|
+ DatanodeDescriptor cur = it.next();
|
|
|
+ if (!cur.isDecommissionInProgress() && !cur.isDecommissioned()) {
|
|
|
+ if ((corruptNodes == null ) || !corruptNodes.contains(cur)) {
|
|
|
+ String rackName = cur.getNetworkLocation();
|
|
|
+ if (!rackSet.contains(rackName)) {
|
|
|
+ rackSet.add(rackName);
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+ return rackSet.size();
|
|
|
+ }
|
|
|
|
|
|
+ boolean blockHasEnoughRacks(Block b) {
|
|
|
+ if (!this.shouldCheckForEnoughRacks) {
|
|
|
+ return true;
|
|
|
+ }
|
|
|
+ boolean enoughRacks = false;;
|
|
|
+ Collection<DatanodeDescriptor> corruptNodes =
|
|
|
+ corruptReplicas.getNodes(b);
|
|
|
+ int numExpectedReplicas = getReplication(b);
|
|
|
+ String rackName = null;
|
|
|
+ for (Iterator<DatanodeDescriptor> it = blocksMap.nodeIterator(b);
|
|
|
+ it.hasNext();) {
|
|
|
+ DatanodeDescriptor cur = it.next();
|
|
|
+ if (!cur.isDecommissionInProgress() && !cur.isDecommissioned()) {
|
|
|
+ if ((corruptNodes == null ) || !corruptNodes.contains(cur)) {
|
|
|
+ if (numExpectedReplicas == 1) {
|
|
|
+ enoughRacks = true;
|
|
|
+ break;
|
|
|
+ }
|
|
|
+ String rackNameNew = cur.getNetworkLocation();
|
|
|
+ if (rackName == null) {
|
|
|
+ rackName = rackNameNew;
|
|
|
+ } else if (!rackName.equals(rackNameNew)) {
|
|
|
+ enoughRacks = true;
|
|
|
+ break;
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+ return enoughRacks;
|
|
|
+ }
|
|
|
+
|
|
|
+ boolean isNeededReplication(Block b, int expectedReplication, int curReplicas) {
|
|
|
+ if ((curReplicas >= expectedReplication) && (blockHasEnoughRacks(b))) {
|
|
|
+ return false;
|
|
|
+ } else {
|
|
|
+ return true;
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
long getMissingBlocksCount() {
|
|
|
// not locking
|
|
|
return Math.max(missingBlocksInPrevIter, missingBlocksInCurIter);
|
|
@@ -1483,4 +1590,26 @@ public class BlockManager {
|
|
|
float getLoadFactor() {
|
|
|
return blocksMap.getLoadFactor();
|
|
|
}
|
|
|
+
|
|
|
+
|
|
|
+ /**
|
|
|
+ * Return a range of corrupt replica block ids. Up to numExpectedBlocks
|
|
|
+ * blocks starting at the next block after startingBlockId are returned
|
|
|
+ * (fewer if numExpectedBlocks blocks are unavailable). If startingBlockId
|
|
|
+ * is null, up to numExpectedBlocks blocks are returned from the beginning.
|
|
|
+ * If startingBlockId cannot be found, null is returned.
|
|
|
+ *
|
|
|
+ * @param numExpectedBlocks Number of block ids to return.
|
|
|
+ * 0 <= numExpectedBlocks <= 100
|
|
|
+ * @param startingBlockId Block id from which to start. If null, start at
|
|
|
+ * beginning.
|
|
|
+ * @return Up to numExpectedBlocks blocks from startingBlockId if it exists
|
|
|
+ *
|
|
|
+ */
|
|
|
+ long[] getCorruptReplicaBlockIds(int numExpectedBlocks,
|
|
|
+ Long startingBlockId) {
|
|
|
+ return corruptReplicas.getCorruptReplicaBlockIds(numExpectedBlocks,
|
|
|
+ startingBlockId);
|
|
|
+ }
|
|
|
+
|
|
|
}
|