|
@@ -152,7 +152,7 @@ class FSNamesystem implements FSConstants {
|
|
|
// We also store pending replication-orders.
|
|
|
// Set of: Block
|
|
|
//
|
|
|
- private Collection<Block> neededReplications = new TreeSet<Block>();
|
|
|
+ private UnderReplicationBlocks neededReplications = new UnderReplicationBlocks();
|
|
|
private Collection<Block> pendingReplications = new TreeSet<Block>();
|
|
|
|
|
|
//
|
|
@@ -277,7 +277,189 @@ class FSNamesystem implements FSConstants {
|
|
|
}
|
|
|
}
|
|
|
}
|
|
|
+
|
|
|
+ /* get replication factor of a block */
|
|
|
+ private int getReplication( Block block ) {
|
|
|
+ FSDirectory.INode fileINode = dir.getFileByBlock(block);
|
|
|
+ if( fileINode == null ) { // block does not belong to any file
|
|
|
+ return 0;
|
|
|
+ } else {
|
|
|
+ return fileINode.getReplication();
|
|
|
+ }
|
|
|
+ }
|
|
|
|
|
|
+ /* Class for keeping track of under replication blocks
|
|
|
+ * Blocks have replication priority, with priority 0 indicating the highest
|
|
|
+ * Blocks have only one replicas has the highest
|
|
|
+ */
|
|
|
+ private class UnderReplicationBlocks {
|
|
|
+ private static final int LEVEL = 3;
|
|
|
+ TreeSet<Block>[] priorityQueues = new TreeSet[LEVEL];
|
|
|
+
|
|
|
+ /* constructor */
|
|
|
+ UnderReplicationBlocks() {
|
|
|
+ for(int i=0; i<LEVEL; i++) {
|
|
|
+ priorityQueues[i] = new TreeSet<Block>();
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ /* Return the total number of under replication blocks */
|
|
|
+ synchronized int size() {
|
|
|
+ int size = 0;
|
|
|
+ for( int i=0; i<LEVEL; i++ ) {
|
|
|
+ size += priorityQueues[i].size();
|
|
|
+ }
|
|
|
+ return size;
|
|
|
+ }
|
|
|
+
|
|
|
+ /* Return the priority of a block
|
|
|
+ * @param block a under replication block
|
|
|
+ * @param curReplicas current number of replicas of the block
|
|
|
+ * @param expectedReplicas expected number of replicas of the block
|
|
|
+ */
|
|
|
+ private int getPriority(Block block,
|
|
|
+ int curReplicas, int expectedReplicas) {
|
|
|
+ if (curReplicas>=expectedReplicas) {
|
|
|
+ return LEVEL; // no need to replicate
|
|
|
+ } else if(curReplicas==1) {
|
|
|
+ return 0; // highest priority
|
|
|
+ } else if(curReplicas*3<expectedReplicas) {
|
|
|
+ return 1;
|
|
|
+ } else {
|
|
|
+ return 2;
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ /* add a block to a under replication queue according to its priority
|
|
|
+ * @param block a under replication block
|
|
|
+ * @param curReplicas current number of replicas of the block
|
|
|
+ * @param expectedReplicas expected number of replicas of the block
|
|
|
+ */
|
|
|
+ synchronized boolean add(
|
|
|
+ Block block, int curReplicas, int expectedReplicas) {
|
|
|
+ if(expectedReplicas <= curReplicas) {
|
|
|
+ return false;
|
|
|
+ }
|
|
|
+ int priLevel = getPriority(block, curReplicas, expectedReplicas);
|
|
|
+ if( priorityQueues[priLevel].add(block) ) {
|
|
|
+ NameNode.stateChangeLog.debug(
|
|
|
+ "BLOCK* NameSystem.UnderReplicationBlock.add:"
|
|
|
+ + block.getBlockName()
|
|
|
+ + " has only "+curReplicas
|
|
|
+ + " replicas and need " + expectedReplicas
|
|
|
+ + " replicas so is added to neededReplications"
|
|
|
+ + " at priority level " + priLevel );
|
|
|
+ return true;
|
|
|
+ }
|
|
|
+ return false;
|
|
|
+ }
|
|
|
+
|
|
|
+ /* add a block to a under replication queue */
|
|
|
+ synchronized boolean add(Block block) {
|
|
|
+ int curReplicas = countContainingNodes(blocksMap.get(block));
|
|
|
+ int expectedReplicas = getReplication(block);
|
|
|
+ return add(block, curReplicas, expectedReplicas);
|
|
|
+ }
|
|
|
+
|
|
|
+ /* remove a block from a under replication queue */
|
|
|
+ synchronized boolean remove(Block block,
|
|
|
+ int oldReplicas, int oldExpectedReplicas) {
|
|
|
+ if(oldExpectedReplicas <= oldReplicas) {
|
|
|
+ return false;
|
|
|
+ }
|
|
|
+ int priLevel = getPriority(block, oldReplicas, oldExpectedReplicas);
|
|
|
+ return remove(block, priLevel);
|
|
|
+ }
|
|
|
+
|
|
|
+ /* remove a block from a under replication queue given a priority*/
|
|
|
+ private boolean remove(Block block, int priLevel ) {
|
|
|
+ if( priorityQueues[priLevel].remove(block) ) {
|
|
|
+ NameNode.stateChangeLog.debug(
|
|
|
+ "BLOCK* NameSystem.UnderReplicationBlock.remove: "
|
|
|
+ + "Removing block " + block.getBlockName()
|
|
|
+ + " from priority queue "+ priLevel );
|
|
|
+ return true;
|
|
|
+ } else {
|
|
|
+ for(int i=0; i<LEVEL; i++) {
|
|
|
+ if( i!=priLevel && priorityQueues[i].remove(block) ) {
|
|
|
+ NameNode.stateChangeLog.debug(
|
|
|
+ "BLOCK* NameSystem.UnderReplicationBlock.remove: "
|
|
|
+ + "Removing block " + block.getBlockName()
|
|
|
+ + " from priority queue "+ i );
|
|
|
+ return true;
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+ return false;
|
|
|
+ }
|
|
|
+
|
|
|
+ /* remove a block from a under replication queue */
|
|
|
+ synchronized boolean remove(Block block) {
|
|
|
+ int curReplicas = countContainingNodes(blocksMap.get(block));
|
|
|
+ int expectedReplicas = getReplication(block);
|
|
|
+ return remove(block, curReplicas, expectedReplicas);
|
|
|
+ }
|
|
|
+
|
|
|
+ /* update the priority level of a block */
|
|
|
+ synchronized void update(Block block,
|
|
|
+ int curReplicasDelta, int expectedReplicasDelta) {
|
|
|
+ int curReplicas = countContainingNodes(blocksMap.get(block));
|
|
|
+ int curExpectedReplicas = getReplication(block);
|
|
|
+ int oldReplicas = curReplicas-curReplicasDelta;
|
|
|
+ int oldExpectedReplicas = curExpectedReplicas-expectedReplicasDelta;
|
|
|
+ int curPri = getPriority(block, curReplicas, curExpectedReplicas);
|
|
|
+ int oldPri = getPriority(block, oldReplicas, oldExpectedReplicas);
|
|
|
+ if( oldPri != LEVEL && oldPri != curPri ) {
|
|
|
+ remove(block, oldPri);
|
|
|
+ }
|
|
|
+ if( curPri != LEVEL && oldPri != curPri
|
|
|
+ && priorityQueues[curPri].add(block)) {
|
|
|
+ NameNode.stateChangeLog.debug(
|
|
|
+ "BLOCK* NameSystem.UnderReplicationBlock.update:"
|
|
|
+ + block.getBlockName()
|
|
|
+ + " has only "+curReplicas
|
|
|
+ + " replicas and need " + curExpectedReplicas
|
|
|
+ + " replicas so is added to neededReplications"
|
|
|
+ + " at priority level " + curPri );
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ /* return a iterator of all the under replication blocks */
|
|
|
+ synchronized Iterator<Block> iterator() {
|
|
|
+ return new Iterator<Block>() {
|
|
|
+ int level;
|
|
|
+ Iterator<Block>[] iterator = new Iterator[LEVEL];
|
|
|
+
|
|
|
+ {
|
|
|
+ level=0;
|
|
|
+ for(int i=0; i<LEVEL; i++) {
|
|
|
+ iterator[i] = priorityQueues[i].iterator();
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ private void update() {
|
|
|
+ while( level< LEVEL-1 && !iterator[level].hasNext() ) {
|
|
|
+ level++;
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ public Block next() {
|
|
|
+ update();
|
|
|
+ return iterator[level].next();
|
|
|
+ }
|
|
|
+
|
|
|
+ public boolean hasNext() {
|
|
|
+ update();
|
|
|
+ return iterator[level].hasNext();
|
|
|
+ }
|
|
|
+
|
|
|
+ public void remove() {
|
|
|
+ throw new UnsupportedOperationException();
|
|
|
+ }
|
|
|
+ };
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
/////////////////////////////////////////////////////////
|
|
|
//
|
|
|
// These methods are called by HadoopFS clients
|
|
@@ -347,20 +529,18 @@ class FSNamesystem implements FSConstants {
|
|
|
if( oldRepl == replication ) // the same replication
|
|
|
return true;
|
|
|
|
|
|
- synchronized( neededReplications ) {
|
|
|
- if( oldRepl < replication ) {
|
|
|
- // old replication < the new one; need to replicate
|
|
|
- LOG.info("Increasing replication for file " + src
|
|
|
- + ". New replication is " + replication );
|
|
|
- for( int idx = 0; idx < fileBlocks.length; idx++ )
|
|
|
- neededReplications.add( fileBlocks[idx] );
|
|
|
- } else {
|
|
|
- // old replication > the new one; need to remove copies
|
|
|
- LOG.info("Reducing replication for file " + src
|
|
|
- + ". New replication is " + replication );
|
|
|
- for( int idx = 0; idx < fileBlocks.length; idx++ )
|
|
|
- proccessOverReplicatedBlock( fileBlocks[idx], replication );
|
|
|
- }
|
|
|
+ // update needReplication priority queues
|
|
|
+ LOG.info("Increasing replication for file " + src
|
|
|
+ + ". New replication is " + replication );
|
|
|
+ for( int idx = 0; idx < fileBlocks.length; idx++ )
|
|
|
+ neededReplications.update( fileBlocks[idx], 0, replication-oldRepl );
|
|
|
+
|
|
|
+ if( oldRepl > replication ) {
|
|
|
+ // old replication > the new one; need to remove copies
|
|
|
+ LOG.info("Reducing replication for file " + src
|
|
|
+ + ". New replication is " + replication );
|
|
|
+ for( int idx = 0; idx < fileBlocks.length; idx++ )
|
|
|
+ proccessOverReplicatedBlock( fileBlocks[idx], replication );
|
|
|
}
|
|
|
return true;
|
|
|
}
|
|
@@ -715,19 +895,15 @@ class FSNamesystem implements FSConstants {
|
|
|
|
|
|
// Now that the file is real, we need to be sure to replicate
|
|
|
// the blocks.
|
|
|
+ int numExpectedReplicas = pendingFile.getReplication();
|
|
|
for (int i = 0; i < nrBlocks; i++) {
|
|
|
SortedSet<DatanodeDescriptor> containingNodes = blocksMap.get(pendingBlocks[i]);
|
|
|
// filter out containingNodes that are marked for decommission.
|
|
|
int numCurrentReplica = countContainingNodes(containingNodes);
|
|
|
|
|
|
- if (numCurrentReplica < pendingFile.getReplication()) {
|
|
|
- NameNode.stateChangeLog.debug(
|
|
|
- "DIR* NameSystem.completeFile:"
|
|
|
- + pendingBlocks[i].getBlockName()+" has only "+containingNodes.size()
|
|
|
- +" replicas so is added to neededReplications");
|
|
|
- synchronized (neededReplications) {
|
|
|
- neededReplications.add(pendingBlocks[i]);
|
|
|
- }
|
|
|
+ if (numCurrentReplica < numExpectedReplicas) {
|
|
|
+ neededReplications.add(
|
|
|
+ pendingBlocks[i], numCurrentReplica, numExpectedReplicas);
|
|
|
}
|
|
|
}
|
|
|
return COMPLETE_SUCCESS;
|
|
@@ -1608,8 +1784,10 @@ class FSNamesystem implements FSConstants {
|
|
|
containingNodes = new TreeSet<DatanodeDescriptor>();
|
|
|
blocksMap.put(block, containingNodes);
|
|
|
}
|
|
|
+ int curReplicaDelta = 0;
|
|
|
if (! containingNodes.contains(node)) {
|
|
|
containingNodes.add(node);
|
|
|
+ curReplicaDelta = 1;
|
|
|
//
|
|
|
// Hairong: I would prefer to set the level of next logrecord
|
|
|
// to be debug.
|
|
@@ -1625,34 +1803,24 @@ class FSNamesystem implements FSConstants {
|
|
|
+ block.getBlockName() + " on " + node.getName());
|
|
|
}
|
|
|
|
|
|
- synchronized (neededReplications) {
|
|
|
- FSDirectory.INode fileINode = dir.getFileByBlock(block);
|
|
|
- if( fileINode == null ) // block does not belong to any file
|
|
|
- return;
|
|
|
-
|
|
|
- // filter out containingNodes that are marked for decommission.
|
|
|
- int numCurrentReplica = countContainingNodes(containingNodes);
|
|
|
-
|
|
|
- // check whether safe replication is reached for the block
|
|
|
- // only if it is a part of a files
|
|
|
- incrementSafeBlockCount( numCurrentReplica );
|
|
|
- short fileReplication = fileINode.getReplication();
|
|
|
- if (numCurrentReplica >= fileReplication ) {
|
|
|
- neededReplications.remove(block);
|
|
|
- pendingReplications.remove(block);
|
|
|
- NameNode.stateChangeLog.trace("BLOCK* NameSystem.addStoredBlock: "
|
|
|
- +block.getBlockName()+" has "+ numCurrentReplica
|
|
|
- +" replicas so is removed from neededReplications and pendingReplications" );
|
|
|
-
|
|
|
- } else {// numCurrentReplica < fileReplication
|
|
|
- neededReplications.add(block);
|
|
|
- NameNode.stateChangeLog.debug("BLOCK* NameSystem.addStoredBlock: "
|
|
|
- +block.getBlockName()+" has only "+ numCurrentReplica
|
|
|
- +" replicas so is added to neededReplications" );
|
|
|
- }
|
|
|
-
|
|
|
- proccessOverReplicatedBlock( block, fileReplication );
|
|
|
- }
|
|
|
+ FSDirectory.INode fileINode = dir.getFileByBlock(block);
|
|
|
+ if( fileINode == null ) // block does not belong to any file
|
|
|
+ return;
|
|
|
+
|
|
|
+ // filter out containingNodes that are marked for decommission.
|
|
|
+ int numCurrentReplica = countContainingNodes(containingNodes);
|
|
|
+
|
|
|
+ // check whether safe replication is reached for the block
|
|
|
+ // only if it is a part of a files
|
|
|
+ incrementSafeBlockCount( numCurrentReplica );
|
|
|
+
|
|
|
+ // handle underReplication/overReplication
|
|
|
+ short fileReplication = fileINode.getReplication();
|
|
|
+ neededReplications.update(block, curReplicaDelta, 0);
|
|
|
+ if (numCurrentReplica >= fileReplication ) {
|
|
|
+ pendingReplications.remove(block);
|
|
|
+ }
|
|
|
+ proccessOverReplicatedBlock( block, fileReplication );
|
|
|
}
|
|
|
|
|
|
/**
|
|
@@ -1748,8 +1916,12 @@ class FSNamesystem implements FSConstants {
|
|
|
return;
|
|
|
}
|
|
|
containingNodes.remove(node);
|
|
|
- decrementSafeBlockCount( containingNodes.size() );
|
|
|
- if( containingNodes.size() == 0 )
|
|
|
+
|
|
|
+ // filter out containingNodes that are marked for decommission.
|
|
|
+ int numCurrentReplica = countContainingNodes(containingNodes);
|
|
|
+
|
|
|
+ decrementSafeBlockCount( numCurrentReplica );
|
|
|
+ if( containingNodes.isEmpty() )
|
|
|
blocksMap.remove(block);
|
|
|
//
|
|
|
// It's possible that the block was removed because of a datanode
|
|
@@ -1758,13 +1930,8 @@ class FSNamesystem implements FSConstants {
|
|
|
// be-replicated list.
|
|
|
//
|
|
|
FSDirectory.INode fileINode = dir.getFileByBlock(block);
|
|
|
- if( fileINode != null && (countContainingNodes(containingNodes) < fileINode.getReplication())) {
|
|
|
- synchronized (neededReplications) {
|
|
|
- neededReplications.add(block);
|
|
|
- }
|
|
|
- NameNode.stateChangeLog.debug("BLOCK* NameSystem.removeStoredBlock: "
|
|
|
- +block.getBlockName()+" has only "+containingNodes.size()
|
|
|
- +" replicas so is added to neededReplications" );
|
|
|
+ if( fileINode != null ) {
|
|
|
+ neededReplications.update(block, -1, 0);
|
|
|
}
|
|
|
|
|
|
//
|
|
@@ -1896,9 +2063,7 @@ class FSNamesystem implements FSConstants {
|
|
|
// replicated.
|
|
|
Block decommissionBlocks[] = node.getBlocks();
|
|
|
for (int j = 0; j < decommissionBlocks.length; j++) {
|
|
|
- synchronized (neededReplications) {
|
|
|
- neededReplications.add(decommissionBlocks[j]);
|
|
|
- }
|
|
|
+ neededReplications.update(decommissionBlocks[j], -1, 0);
|
|
|
}
|
|
|
}
|
|
|
break;
|
|
@@ -2107,15 +2272,13 @@ class FSNamesystem implements FSConstants {
|
|
|
* reside on the specified node. Otherwise returns false.
|
|
|
*/
|
|
|
private boolean isReplicationInProgress(DatanodeDescriptor srcNode) {
|
|
|
- synchronized (neededReplications) {
|
|
|
for (Iterator<Block> it = neededReplications.iterator(); it.hasNext();){
|
|
|
- Block block = it.next();
|
|
|
- Collection<DatanodeDescriptor> containingNodes = blocksMap.get(block);
|
|
|
- if (containingNodes.contains(srcNode)) {
|
|
|
- return true;
|
|
|
- }
|
|
|
+ Block block = it.next();
|
|
|
+ Collection<DatanodeDescriptor> containingNodes = blocksMap.get(block);
|
|
|
+ if (containingNodes.contains(srcNode)) {
|
|
|
+ return true;
|
|
|
+ }
|
|
|
}
|
|
|
- }
|
|
|
return false;
|
|
|
}
|
|
|
|
|
@@ -2237,9 +2400,10 @@ class FSNamesystem implements FSConstants {
|
|
|
DatanodeDescriptor targets[] =
|
|
|
(DatanodeDescriptor[]) replicateTargetSets.get(i);
|
|
|
int numCurrentReplica = numCurrentReplicas.get(i).intValue();
|
|
|
- if (numCurrentReplica + targets.length >=
|
|
|
- dir.getFileByBlock( block).getReplication() ) {
|
|
|
- neededReplications.remove(block);
|
|
|
+ int numExpectedReplica = dir.getFileByBlock( block).getReplication();
|
|
|
+ neededReplications.update(
|
|
|
+ block, numCurrentReplica, numExpectedReplica);
|
|
|
+ if (numCurrentReplica + targets.length >= numExpectedReplica) {
|
|
|
pendingReplications.add(block);
|
|
|
NameNode.stateChangeLog.debug(
|
|
|
"BLOCK* NameSystem.pendingTransfer: "
|