|
@@ -30,6 +30,7 @@ import org.apache.hadoop.ipc.Server;
|
|
|
|
|
|
import java.io.*;
|
|
|
import java.util.*;
|
|
|
+import java.lang.UnsupportedOperationException;
|
|
|
|
|
|
import javax.servlet.ServletContext;
|
|
|
import javax.servlet.ServletException;
|
|
@@ -60,11 +61,10 @@ class FSNamesystem implements FSConstants {
|
|
|
//
|
|
|
// Stores the block-->datanode(s) map. Updated only in response
|
|
|
// to client-sent information.
|
|
|
- // Mapping: Block -> TreeSet<DatanodeDescriptor>
|
|
|
+ // Mapping: Block -> { INode, datanodes, self ref }
|
|
|
//
|
|
|
- Map<Block, List<DatanodeDescriptor>> blocksMap =
|
|
|
- new HashMap<Block, List<DatanodeDescriptor>>();
|
|
|
-
|
|
|
+ BlocksMap blocksMap = new BlocksMap();
|
|
|
+
|
|
|
/**
|
|
|
* Stores the datanode -> block map.
|
|
|
* <p>
|
|
@@ -245,7 +245,7 @@ class FSNamesystem implements FSConstants {
|
|
|
|
|
|
this.localMachine = hostname;
|
|
|
this.port = port;
|
|
|
- this.dir = new FSDirectory();
|
|
|
+ this.dir = new FSDirectory( this );
|
|
|
StartupOption startOpt = (StartupOption)conf.get(
|
|
|
"dfs.namenode.startup", StartupOption.REGULAR );
|
|
|
this.dir.loadFSImage( getNamespaceDirs(conf), startOpt );
|
|
@@ -299,7 +299,7 @@ class FSNamesystem implements FSConstants {
|
|
|
*/
|
|
|
FSNamesystem(FSImage fsImage) throws IOException {
|
|
|
fsNamesystemObject = this;
|
|
|
- this.dir = new FSDirectory(fsImage);
|
|
|
+ this.dir = new FSDirectory(fsImage, this);
|
|
|
}
|
|
|
|
|
|
/** Return the FSNamesystem object
|
|
@@ -366,14 +366,11 @@ class FSNamesystem implements FSConstants {
|
|
|
for (Iterator<Block> it = neededReplications.iterator();
|
|
|
it.hasNext();) {
|
|
|
Block block = it.next();
|
|
|
- Collection<DatanodeDescriptor> containingNodes = blocksMap.get(block);
|
|
|
out.print(block);
|
|
|
- if (containingNodes != null) {
|
|
|
- for (Iterator<DatanodeDescriptor> jt = containingNodes.iterator();
|
|
|
- jt.hasNext(); ) {
|
|
|
- DatanodeDescriptor node = jt.next();
|
|
|
- out.print(" " + node + " : " );
|
|
|
- }
|
|
|
+ for (Iterator<DatanodeDescriptor> jt = blocksMap.nodeIterator(block);
|
|
|
+ jt.hasNext(); ) {
|
|
|
+ DatanodeDescriptor node = jt.next();
|
|
|
+ out.print(" " + node + " : " );
|
|
|
}
|
|
|
out.println("");
|
|
|
}
|
|
@@ -401,7 +398,7 @@ class FSNamesystem implements FSConstants {
|
|
|
|
|
|
/* get replication factor of a block */
|
|
|
private int getReplication( Block block ) {
|
|
|
- FSDirectory.INode fileINode = dir.getFileByBlock(block);
|
|
|
+ FSDirectory.INode fileINode = blocksMap.getINode( block );
|
|
|
if( fileINode == null ) { // block does not belong to any file
|
|
|
return 0;
|
|
|
} else {
|
|
@@ -485,9 +482,10 @@ class FSNamesystem implements FSConstants {
|
|
|
|
|
|
/* add a block to a under replication queue */
|
|
|
synchronized boolean add(Block block) {
|
|
|
- int curReplicas = countContainingNodes(blocksMap.get(block));
|
|
|
int expectedReplicas = getReplication(block);
|
|
|
- return add(block, curReplicas, expectedReplicas);
|
|
|
+ return add(block,
|
|
|
+ countContainingNodes( block ),
|
|
|
+ expectedReplicas);
|
|
|
}
|
|
|
|
|
|
/* remove a block from a under replication queue */
|
|
@@ -522,7 +520,7 @@ class FSNamesystem implements FSConstants {
|
|
|
|
|
|
/* remove a block from a under replication queue */
|
|
|
synchronized boolean remove(Block block) {
|
|
|
- int curReplicas = countContainingNodes(blocksMap.get(block));
|
|
|
+ int curReplicas = countContainingNodes( block );
|
|
|
int expectedReplicas = getReplication(block);
|
|
|
return remove(block, curReplicas, expectedReplicas);
|
|
|
}
|
|
@@ -530,7 +528,7 @@ class FSNamesystem implements FSConstants {
|
|
|
/* update the priority level of a block */
|
|
|
synchronized void update(Block block,
|
|
|
int curReplicasDelta, int expectedReplicasDelta) {
|
|
|
- int curReplicas = countContainingNodes(blocksMap.get(block));
|
|
|
+ int curReplicas = countContainingNodes( block );
|
|
|
int curExpectedReplicas = getReplication(block);
|
|
|
int oldReplicas = curReplicas-curReplicasDelta;
|
|
|
int oldExpectedReplicas = curExpectedReplicas-expectedReplicasDelta;
|
|
@@ -614,20 +612,20 @@ class FSNamesystem implements FSConstants {
|
|
|
if (blocks != null) {
|
|
|
results = new Object[2];
|
|
|
DatanodeDescriptor machineSets[][] = new DatanodeDescriptor[blocks.length][];
|
|
|
- DatanodeDescriptor clientNode = getDatanodeByHost(clientMachine);
|
|
|
|
|
|
for (int i = 0; i < blocks.length; i++) {
|
|
|
- Collection<DatanodeDescriptor> containingNodes = blocksMap.get(blocks[i]);
|
|
|
- if (containingNodes == null) {
|
|
|
+ int numNodes = blocksMap.numNodes( blocks[i] );
|
|
|
+ if ( numNodes <= 0 ) {
|
|
|
machineSets[i] = new DatanodeDescriptor[0];
|
|
|
} else {
|
|
|
- machineSets[i] = new DatanodeDescriptor[containingNodes.size()];
|
|
|
- ArrayList<DatanodeDescriptor> containingNodesList =
|
|
|
- new ArrayList<DatanodeDescriptor>(containingNodes.size());
|
|
|
- containingNodesList.addAll(containingNodes);
|
|
|
-
|
|
|
- machineSets[i] = replicator.sortByDistance(
|
|
|
- clientNode, containingNodesList);
|
|
|
+ machineSets[i] = new DatanodeDescriptor[ numNodes ];
|
|
|
+ numNodes = 0;
|
|
|
+ for( Iterator<DatanodeDescriptor> it =
|
|
|
+ blocksMap.nodeIterator( blocks[i] ); it.hasNext(); ) {
|
|
|
+ machineSets[i][ numNodes++ ] = it.next();
|
|
|
+ }
|
|
|
+ clusterMap.sortByDistance( getDatanodeByHost(clientMachine),
|
|
|
+ machineSets[i] );
|
|
|
}
|
|
|
}
|
|
|
|
|
@@ -998,9 +996,7 @@ class FSNamesystem implements FSConstants {
|
|
|
//
|
|
|
for (int i = 0; i < nrBlocks; i++) {
|
|
|
Block b = pendingBlocks[i];
|
|
|
- List<DatanodeDescriptor> containingNodes = blocksMap.get(b);
|
|
|
- Block storedBlock =
|
|
|
- containingNodes.get(0).getBlock(b);
|
|
|
+ Block storedBlock = blocksMap.getStoredBlock( b );
|
|
|
if ( storedBlock != null ) {
|
|
|
pendingBlocks[i] = storedBlock;
|
|
|
}
|
|
@@ -1044,10 +1040,8 @@ class FSNamesystem implements FSConstants {
|
|
|
// the blocks.
|
|
|
int numExpectedReplicas = pendingFile.getReplication();
|
|
|
for (int i = 0; i < nrBlocks; i++) {
|
|
|
- Collection<DatanodeDescriptor> containingNodes = blocksMap.get(pendingBlocks[i]);
|
|
|
- // filter out containingNodes that are marked for decommission.
|
|
|
- int numCurrentReplica = countContainingNodes(containingNodes);
|
|
|
-
|
|
|
+ // filter out containingNodes that are marked for decommission.
|
|
|
+ int numCurrentReplica = countContainingNodes( pendingBlocks[i] );
|
|
|
if (numCurrentReplica < numExpectedReplicas) {
|
|
|
neededReplications.add(
|
|
|
pendingBlocks[i], numCurrentReplica, numExpectedReplicas);
|
|
@@ -1065,7 +1059,7 @@ class FSNamesystem implements FSConstants {
|
|
|
Block b = null;
|
|
|
do {
|
|
|
b = new Block(FSNamesystem.randBlockId.nextLong(), 0);
|
|
|
- } while (dir.isValidBlock(b));
|
|
|
+ } while ( isValidBlock(b) );
|
|
|
FileUnderConstruction v = pendingCreates.get(src);
|
|
|
v.getBlocks().add(b);
|
|
|
pendingCreateBlocks.add(b);
|
|
@@ -1083,9 +1077,7 @@ class FSNamesystem implements FSConstants {
|
|
|
FileUnderConstruction v = pendingCreates.get(src);
|
|
|
|
|
|
for (Iterator<Block> it = v.getBlocks().iterator(); it.hasNext(); ) {
|
|
|
- Block b = it.next();
|
|
|
- Collection<DatanodeDescriptor> containingNodes = blocksMap.get(b);
|
|
|
- if (containingNodes == null || containingNodes.size() < this.minReplication) {
|
|
|
+ if ( blocksMap.numNodes(it.next()) < this.minReplication ) {
|
|
|
return false;
|
|
|
}
|
|
|
}
|
|
@@ -1145,24 +1137,20 @@ class FSNamesystem implements FSConstants {
|
|
|
throw new SafeModeException("Cannot invalidate block " + blk.getBlockName(), safeMode);
|
|
|
}
|
|
|
|
|
|
- Collection<DatanodeDescriptor> containingNodes = blocksMap.get(blk);
|
|
|
-
|
|
|
// Check how many copies we have of the block. If we have at least one
|
|
|
// copy on a live node, then we can delete it.
|
|
|
- if (containingNodes != null ) {
|
|
|
- if ((countContainingNodes(containingNodes) > 1) ||
|
|
|
- ((countContainingNodes(containingNodes) == 1) &&
|
|
|
- (dn.isDecommissionInProgress() || dn.isDecommissioned()))) {
|
|
|
+ int count = countContainingNodes( blk );
|
|
|
+ if ( (count > 1) || ( (count == 1) && ( dn.isDecommissionInProgress() ||
|
|
|
+ dn.isDecommissioned() ))) {
|
|
|
addToInvalidates(blk, dn);
|
|
|
removeStoredBlock(blk, getDatanode(dn));
|
|
|
NameNode.stateChangeLog.info("BLOCK* NameSystem.invalidateBlocks: "
|
|
|
+ blk.getBlockName() + " on "
|
|
|
+ dn.getName() + " listed for deletion.");
|
|
|
- } else {
|
|
|
+ } else {
|
|
|
NameNode.stateChangeLog.info("BLOCK* NameSystem.invalidateBlocks: "
|
|
|
+ blk.getBlockName() + " on "
|
|
|
+ dn.getName() + " is the only copy and was not deleted.");
|
|
|
- }
|
|
|
}
|
|
|
}
|
|
|
|
|
@@ -1202,15 +1190,14 @@ class FSNamesystem implements FSConstants {
|
|
|
if (deletedBlocks != null) {
|
|
|
for (int i = 0; i < deletedBlocks.length; i++) {
|
|
|
Block b = deletedBlocks[i];
|
|
|
-
|
|
|
- Collection<DatanodeDescriptor> containingNodes = blocksMap.get(b);
|
|
|
- if (containingNodes != null) {
|
|
|
- for (Iterator<DatanodeDescriptor> it = containingNodes.iterator(); it.hasNext(); ) {
|
|
|
- DatanodeDescriptor node = it.next();
|
|
|
- addToInvalidates(b, node);
|
|
|
- NameNode.stateChangeLog.debug("BLOCK* NameSystem.delete: "
|
|
|
- + b.getBlockName() + " is added to invalidSet of " + node.getName() );
|
|
|
- }
|
|
|
+
|
|
|
+ for ( Iterator<DatanodeDescriptor> it =
|
|
|
+ blocksMap.nodeIterator( b ); it.hasNext(); ) {
|
|
|
+ DatanodeDescriptor node = it.next();
|
|
|
+ addToInvalidates(b, node);
|
|
|
+ NameNode.stateChangeLog.debug("BLOCK* NameSystem.delete: "
|
|
|
+ + b.getBlockName() + " is added to invalidSet of "
|
|
|
+ + node.getName() );
|
|
|
}
|
|
|
}
|
|
|
}
|
|
@@ -1327,12 +1314,10 @@ class FSNamesystem implements FSConstants {
|
|
|
} else {
|
|
|
String hosts[][] = new String[(endBlock - startBlock) + 1][];
|
|
|
for (int i = startBlock; i <= endBlock; i++) {
|
|
|
- Collection<DatanodeDescriptor> containingNodes = blocksMap.get(blocks[i]);
|
|
|
Collection<String> v = new ArrayList<String>();
|
|
|
- if (containingNodes != null) {
|
|
|
- for (Iterator<DatanodeDescriptor> it =containingNodes.iterator(); it.hasNext();) {
|
|
|
+ for ( Iterator<DatanodeDescriptor> it =
|
|
|
+ blocksMap.nodeIterator( blocks[i] ); it.hasNext(); ) {
|
|
|
v.add( it.next().getHostName() );
|
|
|
- }
|
|
|
}
|
|
|
hosts[i-startBlock] = v.toArray(new String[v.size()]);
|
|
|
}
|
|
@@ -2169,7 +2154,7 @@ class FSNamesystem implements FSConstants {
|
|
|
// they are added to recentInvalidateSets and will be sent out
|
|
|
// thorugh succeeding heartbeat responses.
|
|
|
//
|
|
|
- if (! dir.isValidBlock(b) && ! pendingCreateBlocks.contains(b)) {
|
|
|
+ if (! isValidBlock(b) && ! pendingCreateBlocks.contains(b)) {
|
|
|
if (obsolete.size() > FSConstants.BLOCK_INVALIDATE_CHUNK) {
|
|
|
addToInvalidates(b, node);
|
|
|
} else {
|
|
@@ -2188,28 +2173,23 @@ class FSNamesystem implements FSConstants {
|
|
|
* @return the block that is stored in blockMap.
|
|
|
*/
|
|
|
synchronized Block addStoredBlock(Block block, DatanodeDescriptor node) {
|
|
|
- List<DatanodeDescriptor> containingNodes = blocksMap.get(block);
|
|
|
- if (containingNodes == null) {
|
|
|
- //Create an arraylist with the current replication factor
|
|
|
- FSDirectory.INode inode = dir.getFileByBlock(block);
|
|
|
- int replication = (inode != null) ?
|
|
|
- inode.getReplication() : defaultReplication;
|
|
|
- containingNodes = new ArrayList<DatanodeDescriptor>(replication);
|
|
|
- blocksMap.put(block, containingNodes);
|
|
|
- } else {
|
|
|
- Block storedBlock =
|
|
|
- containingNodes.get(0).getBlock(block);
|
|
|
- // update stored block's length.
|
|
|
- if ( storedBlock != null ) {
|
|
|
- if ( block.getNumBytes() > 0 ) {
|
|
|
- storedBlock.setNumBytes( block.getNumBytes() );
|
|
|
- }
|
|
|
- block = storedBlock;
|
|
|
+
|
|
|
+ FSDirectory.INode fileINode = blocksMap.getINode( block );
|
|
|
+ int replication = (fileINode != null) ? fileINode.getReplication() :
|
|
|
+ defaultReplication;
|
|
|
+ boolean added = blocksMap.addNode( block, node, replication );
|
|
|
+
|
|
|
+ Block storedBlock = blocksMap.getStoredBlock( block ); //extra look up!
|
|
|
+ if ( storedBlock != null && block != storedBlock ) {
|
|
|
+ if ( block.getNumBytes() > 0 ) {
|
|
|
+ storedBlock.setNumBytes( block.getNumBytes() );
|
|
|
}
|
|
|
+ block = storedBlock;
|
|
|
}
|
|
|
+
|
|
|
int curReplicaDelta = 0;
|
|
|
- if (! containingNodes.contains(node)) {
|
|
|
- containingNodes.add(node);
|
|
|
+
|
|
|
+ if ( added ) {
|
|
|
curReplicaDelta = 1;
|
|
|
//
|
|
|
// Hairong: I would prefer to set the level of next logrecord
|
|
@@ -2226,12 +2206,11 @@ class FSNamesystem implements FSConstants {
|
|
|
+ block.getBlockName() + " on " + node.getName());
|
|
|
}
|
|
|
|
|
|
- FSDirectory.INode fileINode = dir.getFileByBlock(block);
|
|
|
if( fileINode == null ) // block does not belong to any file
|
|
|
return block;
|
|
|
|
|
|
// filter out containingNodes that are marked for decommission.
|
|
|
- int numCurrentReplica = countContainingNodes(containingNodes)
|
|
|
+ int numCurrentReplica = countContainingNodes( block )
|
|
|
+ pendingReplications.getNumReplicas(block);
|
|
|
|
|
|
// check whether safe replication is reached for the block
|
|
@@ -2255,11 +2234,9 @@ class FSNamesystem implements FSConstants {
|
|
|
* mark them in the excessReplicateMap.
|
|
|
*/
|
|
|
private void proccessOverReplicatedBlock( Block block, short replication ) {
|
|
|
- Collection<DatanodeDescriptor> containingNodes = blocksMap.get(block);
|
|
|
- if( containingNodes == null )
|
|
|
- return;
|
|
|
Collection<DatanodeDescriptor> nonExcess = new ArrayList<DatanodeDescriptor>();
|
|
|
- for (Iterator<DatanodeDescriptor> it = containingNodes.iterator(); it.hasNext(); ) {
|
|
|
+ for (Iterator<DatanodeDescriptor> it = blocksMap.nodeIterator( block );
|
|
|
+ it.hasNext(); ) {
|
|
|
DatanodeDescriptor cur = it.next();
|
|
|
Collection<Block> excessBlocks = excessReplicateMap.get(cur.getStorageID());
|
|
|
if (excessBlocks == null || ! excessBlocks.contains(block)) {
|
|
@@ -2335,27 +2312,20 @@ class FSNamesystem implements FSConstants {
|
|
|
synchronized void removeStoredBlock(Block block, DatanodeDescriptor node) {
|
|
|
NameNode.stateChangeLog.debug("BLOCK* NameSystem.removeStoredBlock: "
|
|
|
+block.getBlockName() + " from "+node.getName() );
|
|
|
- Collection<DatanodeDescriptor> containingNodes = blocksMap.get(block);
|
|
|
- if (containingNodes == null || ! containingNodes.contains(node)) {
|
|
|
+ if ( !blocksMap.removeNode( block, node ) ) {
|
|
|
NameNode.stateChangeLog.debug("BLOCK* NameSystem.removeStoredBlock: "
|
|
|
+block.getBlockName()+" has already been removed from node "+node );
|
|
|
return;
|
|
|
}
|
|
|
- containingNodes.remove(node);
|
|
|
|
|
|
- // filter out containingNodes that are marked for decommission.
|
|
|
- int numCurrentReplica = countContainingNodes(containingNodes);
|
|
|
-
|
|
|
- decrementSafeBlockCount( numCurrentReplica );
|
|
|
- if( containingNodes.isEmpty() )
|
|
|
- blocksMap.remove(block);
|
|
|
+ decrementSafeBlockCount( block );
|
|
|
//
|
|
|
// It's possible that the block was removed because of a datanode
|
|
|
// failure. If the block is still valid, check if replication is
|
|
|
// necessary. In that case, put block on a possibly-will-
|
|
|
// be-replicated list.
|
|
|
//
|
|
|
- FSDirectory.INode fileINode = dir.getFileByBlock(block);
|
|
|
+ FSDirectory.INode fileINode = blocksMap.getINode( block );
|
|
|
if( fileINode != null ) {
|
|
|
neededReplications.update(block, -1, 0);
|
|
|
}
|
|
@@ -2636,28 +2606,27 @@ class FSNamesystem implements FSConstants {
|
|
|
* Counts the number of nodes in the given list. Skips over nodes
|
|
|
* that are marked for decommission.
|
|
|
*/
|
|
|
- private int countContainingNodes(Collection<DatanodeDescriptor> nodelist) {
|
|
|
- if( nodelist == null ) return 0;
|
|
|
+ private int countContainingNodes(Iterator<DatanodeDescriptor> nodeIter) {
|
|
|
int count = 0;
|
|
|
- for (Iterator<DatanodeDescriptor> it = nodelist.iterator();
|
|
|
- it.hasNext(); ) {
|
|
|
- DatanodeDescriptor node = it.next();
|
|
|
+ while ( nodeIter.hasNext() ) {
|
|
|
+ DatanodeDescriptor node = nodeIter.next();
|
|
|
if (!node.isDecommissionInProgress() && !node.isDecommissioned()) {
|
|
|
count++;
|
|
|
}
|
|
|
}
|
|
|
return count;
|
|
|
}
|
|
|
+
|
|
|
+ /** wrapper for countContainingNodes( Iterator ). */
|
|
|
+ private int countContainingNodes( Block b ) {
|
|
|
+ return countContainingNodes( blocksMap.nodeIterator( b ) );
|
|
|
+ }
|
|
|
|
|
|
- /*
|
|
|
- * Filter nodes that are marked for decommison in the given list.
|
|
|
- * Return a list of non-decommissioned nodes
|
|
|
- */
|
|
|
- private List<DatanodeDescriptor> filterDecommissionedNodes(
|
|
|
- Collection<DatanodeDescriptor> nodelist) {
|
|
|
- List<DatanodeDescriptor> nonCommissionedNodeList =
|
|
|
+ /** Reeturns a newly allocated list exluding the decommisioned nodes. */
|
|
|
+ ArrayList<DatanodeDescriptor> containingNodeList( Block b ) {
|
|
|
+ ArrayList<DatanodeDescriptor> nonCommissionedNodeList =
|
|
|
new ArrayList<DatanodeDescriptor>();
|
|
|
- for (Iterator<DatanodeDescriptor> it = nodelist.iterator();
|
|
|
+ for( Iterator<DatanodeDescriptor> it = blocksMap.nodeIterator( b );
|
|
|
it.hasNext(); ) {
|
|
|
DatanodeDescriptor node = it.next();
|
|
|
if (!node.isDecommissionInProgress() && !node.isDecommissioned()) {
|
|
@@ -2674,15 +2643,9 @@ class FSNamesystem implements FSConstants {
|
|
|
Block decommissionBlocks[] = srcNode.getBlocks();
|
|
|
for (int i = 0; i < decommissionBlocks.length; i++) {
|
|
|
Block block = decommissionBlocks[i];
|
|
|
- FSDirectory.INode fileINode = dir.getFileByBlock(block);
|
|
|
- if (fileINode == null) {
|
|
|
- continue;
|
|
|
- }
|
|
|
- Collection<DatanodeDescriptor> containingNodes = blocksMap.get(block);
|
|
|
- List<DatanodeDescriptor> nodes =
|
|
|
- filterDecommissionedNodes(containingNodes);
|
|
|
- int numCurrentReplica = nodes.size();
|
|
|
- if (fileINode.getReplication() > numCurrentReplica) {
|
|
|
+ FSDirectory.INode fileINode = blocksMap.getINode( block );
|
|
|
+ if ( fileINode != null &&
|
|
|
+ fileINode.getReplication() > countContainingNodes(block) ) {
|
|
|
return true;
|
|
|
}
|
|
|
}
|
|
@@ -2758,22 +2721,20 @@ class FSNamesystem implements FSConstants {
|
|
|
}
|
|
|
Block block = it.next();
|
|
|
long blockSize = block.getNumBytes();
|
|
|
- FSDirectory.INode fileINode = dir.getFileByBlock(block);
|
|
|
+ FSDirectory.INode fileINode = blocksMap.getINode( block );
|
|
|
if (fileINode == null) { // block does not belong to any file
|
|
|
it.remove();
|
|
|
} else {
|
|
|
- Collection<DatanodeDescriptor> containingNodes = blocksMap.get(block);
|
|
|
+ List<DatanodeDescriptor> containingNodes =
|
|
|
+ containingNodeList(block);
|
|
|
Collection<Block> excessBlocks = excessReplicateMap.get(
|
|
|
srcNode.getStorageID() );
|
|
|
|
|
|
// srcNode must contain the block, and the block must
|
|
|
// not be scheduled for removal on that node
|
|
|
- if (containingNodes != null && containingNodes.contains(srcNode)
|
|
|
+ if (containingNodes.contains(srcNode)
|
|
|
&& (excessBlocks == null || ! excessBlocks.contains(block))) {
|
|
|
- // filter out containingNodes that are marked for decommission.
|
|
|
- List<DatanodeDescriptor> nodes =
|
|
|
- filterDecommissionedNodes(containingNodes);
|
|
|
- int numCurrentReplica = nodes.size() +
|
|
|
+ int numCurrentReplica = containingNodes.size() +
|
|
|
pendingReplications.getNumReplicas(block);
|
|
|
if (numCurrentReplica >= fileINode.getReplication()) {
|
|
|
it.remove();
|
|
@@ -2782,7 +2743,7 @@ class FSNamesystem implements FSConstants {
|
|
|
Math.min( fileINode.getReplication() - numCurrentReplica,
|
|
|
needed),
|
|
|
datanodeMap.get(srcNode.getStorageID()),
|
|
|
- nodes, null, blockSize);
|
|
|
+ containingNodes, null, blockSize);
|
|
|
if (targets.length > 0) {
|
|
|
// Build items to return
|
|
|
replicateBlocks.add(block);
|
|
@@ -2808,7 +2769,7 @@ class FSNamesystem implements FSConstants {
|
|
|
DatanodeDescriptor targets[] =
|
|
|
(DatanodeDescriptor[]) replicateTargetSets.get(i);
|
|
|
int numCurrentReplica = numCurrentReplicas.get(i).intValue();
|
|
|
- int numExpectedReplica = dir.getFileByBlock( block).getReplication();
|
|
|
+ int numExpectedReplica = blocksMap.getINode(block).getReplication();
|
|
|
if (numCurrentReplica + targets.length >= numExpectedReplica) {
|
|
|
neededReplications.remove(
|
|
|
block, numCurrentReplica, numExpectedReplica);
|
|
@@ -3291,29 +3252,8 @@ class FSNamesystem implements FSConstants {
|
|
|
}
|
|
|
return nodes.toArray( results );
|
|
|
}
|
|
|
-
|
|
|
- /** Return datanodes that sorted by their distances to <i>reader</i>
|
|
|
- */
|
|
|
- DatanodeDescriptor[] sortByDistance(
|
|
|
- final DatanodeDescriptor reader,
|
|
|
- List<DatanodeDescriptor> nodes ) {
|
|
|
- synchronized(clusterMap) {
|
|
|
- if(reader != null && clusterMap.contains(reader)) {
|
|
|
- java.util.Collections.sort(nodes, new Comparator<DatanodeDescriptor>() {
|
|
|
- public int compare(DatanodeDescriptor n1, DatanodeDescriptor n2) {
|
|
|
- return clusterMap.getDistance(reader, n1)
|
|
|
- -clusterMap.getDistance(reader, n2);
|
|
|
- }
|
|
|
- });
|
|
|
- }
|
|
|
- }
|
|
|
- return (DatanodeDescriptor[])nodes.toArray(
|
|
|
- new DatanodeDescriptor[nodes.size()]);
|
|
|
- }
|
|
|
-
|
|
|
} //end of Replicator
|
|
|
|
|
|
-
|
|
|
// Keeps track of which datanodes are allowed to connect to the namenode.
|
|
|
|
|
|
private boolean inHostsList(DatanodeID node) {
|
|
@@ -3582,7 +3522,7 @@ class FSNamesystem implements FSConstants {
|
|
|
* <em>safe blocks</em>, those that have at least the minimal number of
|
|
|
* replicas, and calculates the ratio of safe blocks to the total number
|
|
|
* of blocks in the system, which is the size of
|
|
|
- * {@link FSDirectory#activeBlocks}. When the ratio reaches the
|
|
|
+ * {@link blocksMap}. When the ratio reaches the
|
|
|
* {@link #threshold} it starts the {@link SafeModeMonitor} daemon in order
|
|
|
* to monitor whether the safe mode extension is passed. Then it leaves safe
|
|
|
* mode and destroys itself.
|
|
@@ -3654,7 +3594,9 @@ class FSNamesystem implements FSConstants {
|
|
|
*/
|
|
|
synchronized boolean isOn() {
|
|
|
try {
|
|
|
- isConsistent(); // SHV this is an assert
|
|
|
+ assert isConsistent() : " SafeMode: Inconsistent filesystem state: "
|
|
|
+ + "Total num of blocks, active blocks, or "
|
|
|
+ + "total safe blocks don't match.";
|
|
|
} catch( IOException e ) {
|
|
|
System.err.print( StringUtils.stringifyException( e ));
|
|
|
}
|
|
@@ -3801,26 +3743,20 @@ class FSNamesystem implements FSConstants {
|
|
|
|
|
|
/**
|
|
|
* Checks consistency of the class state.
|
|
|
+ * This is costly and currently called only in assert.
|
|
|
*/
|
|
|
- void isConsistent() throws IOException {
|
|
|
+ boolean isConsistent() throws IOException {
|
|
|
if( blockTotal == -1 && blockSafe == -1 ) {
|
|
|
- return; // manual safe mode
|
|
|
- }
|
|
|
- int activeBlocks = dir.activeBlocks.size();
|
|
|
- if( blockTotal != activeBlocks )
|
|
|
- throw new IOException( "blockTotal " + blockTotal
|
|
|
- + " does not match all blocks count. "
|
|
|
- + "activeBlocks = " + activeBlocks
|
|
|
- + ". safeBlocks = " + blockSafe
|
|
|
- + " safeMode is: "
|
|
|
- + ((safeMode == null) ? "null" : safeMode.toString()) );
|
|
|
- if( blockSafe < 0 || blockSafe > blockTotal )
|
|
|
- throw new IOException( "blockSafe " + blockSafe
|
|
|
- + " is out of range [0," + blockTotal + "]. "
|
|
|
- + "activeBlocks = " + activeBlocks
|
|
|
- + " safeMode is: "
|
|
|
- + ((safeMode == null) ? "null" : safeMode.toString()) );
|
|
|
- }
|
|
|
+ return true; // manual safe mode
|
|
|
+ }
|
|
|
+ int activeBlocks = blocksMap.map.size();
|
|
|
+ for( Iterator<Collection<Block>> it =
|
|
|
+ recentInvalidateSets.values().iterator(); it.hasNext(); ) {
|
|
|
+ activeBlocks -= it.next().size();
|
|
|
+ }
|
|
|
+ return ( blockTotal == activeBlocks ) ||
|
|
|
+ ( blockSafe >= 0 && blockSafe <= blockTotal );
|
|
|
+ }
|
|
|
}
|
|
|
|
|
|
/**
|
|
@@ -3880,10 +3816,10 @@ class FSNamesystem implements FSConstants {
|
|
|
* Decrement number of blocks that reached minimal replication.
|
|
|
* @param replication current replication
|
|
|
*/
|
|
|
- void decrementSafeBlockCount( int replication ) {
|
|
|
- if( safeMode == null )
|
|
|
+ void decrementSafeBlockCount( Block b ) {
|
|
|
+ if( safeMode == null ) // mostly true
|
|
|
return;
|
|
|
- safeMode.decrementSafeBlockCount( (short)replication );
|
|
|
+ safeMode.decrementSafeBlockCount( (short)countContainingNodes( b ) );
|
|
|
}
|
|
|
|
|
|
/**
|
|
@@ -3892,7 +3828,7 @@ class FSNamesystem implements FSConstants {
|
|
|
void setBlockTotal() {
|
|
|
if( safeMode == null )
|
|
|
return;
|
|
|
- safeMode.setBlockTotal( dir.activeBlocks.size() );
|
|
|
+ safeMode.setBlockTotal( blocksMap.map.size() );
|
|
|
}
|
|
|
|
|
|
/**
|
|
@@ -4015,4 +3951,175 @@ class FSNamesystem implements FSConstants {
|
|
|
}
|
|
|
}
|
|
|
}
|
|
|
+
|
|
|
+ /**
|
|
|
+ * Returns whether the given block is one pointed-to by a file.
|
|
|
+ */
|
|
|
+ public boolean isValidBlock(Block b) {
|
|
|
+ return blocksMap.getINode( b ) != null;
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * This class maintains the map from a block to its metadata.
|
|
|
+ * block's metadata currently includes INode it belongs to and
|
|
|
+ * the datanodes that store the block.
|
|
|
+ */
|
|
|
+ class BlocksMap {
|
|
|
+
|
|
|
+ /**
|
|
|
+ * Internal class for block metadata
|
|
|
+ */
|
|
|
+ private class BlockInfo {
|
|
|
+ FSDirectory.INode inode;
|
|
|
+
|
|
|
+ /** nodes could contain some null entries at the end, so
|
|
|
+ * nodes.legth >= number of datanodes.
|
|
|
+ * if nodes != null then nodes[0] != null.
|
|
|
+ */
|
|
|
+ DatanodeDescriptor nodes[];
|
|
|
+ Block block; //block that was inserted.
|
|
|
+ }
|
|
|
+
|
|
|
+ private class NodeIterator implements Iterator<DatanodeDescriptor> {
|
|
|
+ NodeIterator( DatanodeDescriptor[] nodes ) {
|
|
|
+ arr = nodes;
|
|
|
+ }
|
|
|
+ DatanodeDescriptor[] arr;
|
|
|
+ int nextIdx = 0;
|
|
|
+
|
|
|
+ public boolean hasNext() {
|
|
|
+ return arr != null && nextIdx < arr.length && arr[nextIdx] != null;
|
|
|
+ }
|
|
|
+
|
|
|
+ public DatanodeDescriptor next() {
|
|
|
+ return arr[nextIdx++];
|
|
|
+ }
|
|
|
+
|
|
|
+ public void remove() {
|
|
|
+ throw new UnsupportedOperationException( "Sorry. can't remove." );
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ Map<Block, BlockInfo> map = new HashMap<Block, BlockInfo>();
|
|
|
+
|
|
|
+ /** add BlockInfo if mapping does not exist */
|
|
|
+ private BlockInfo checkBlockInfo( Block b ) {
|
|
|
+ BlockInfo info = map.get( b );
|
|
|
+ if ( info == null ) {
|
|
|
+ info = new BlockInfo();
|
|
|
+ info.block = b;
|
|
|
+ map.put( b, info );
|
|
|
+ }
|
|
|
+ return info;
|
|
|
+ }
|
|
|
+
|
|
|
+ public FSDirectory.INode getINode( Block b ) {
|
|
|
+ BlockInfo info = map.get( b );
|
|
|
+ return ( info != null ) ? info.inode : null;
|
|
|
+ }
|
|
|
+
|
|
|
+ public void addINode( Block b, FSDirectory.INode iNode ) {
|
|
|
+ BlockInfo info = checkBlockInfo( b );
|
|
|
+ info.inode = iNode;
|
|
|
+ }
|
|
|
+
|
|
|
+ public void removeINode( Block b ) {
|
|
|
+ BlockInfo info = map.get( b );
|
|
|
+ if ( info != null ) {
|
|
|
+ info.inode = null;
|
|
|
+ if ( info.nodes == null ) {
|
|
|
+ map.remove( b );
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ /** Returns the block object it it exists in the map */
|
|
|
+ public Block getStoredBlock( Block b ) {
|
|
|
+ BlockInfo info = map.get( b );
|
|
|
+ return ( info != null ) ? info.block : null;
|
|
|
+ }
|
|
|
+
|
|
|
+ /** Returned Iterator does not support */
|
|
|
+ public Iterator<DatanodeDescriptor> nodeIterator( Block b ) {
|
|
|
+ BlockInfo info = map.get( b );
|
|
|
+ return new NodeIterator( ( info != null ) ? info.nodes : null );
|
|
|
+ }
|
|
|
+
|
|
|
+ /** counts number of containing nodes. Better than using iterator. */
|
|
|
+ public int numNodes( Block b ) {
|
|
|
+ int count = 0;
|
|
|
+ BlockInfo info = map.get( b );
|
|
|
+ if ( info != null && info.nodes != null ) {
|
|
|
+ count = info.nodes.length;
|
|
|
+ while ( info.nodes[ count-1 ] == null ) // mostly false
|
|
|
+ count--;
|
|
|
+ }
|
|
|
+ return count;
|
|
|
+ }
|
|
|
+
|
|
|
+ /** returns true if the node does not already exists and is added.
|
|
|
+ * false if the node already exists.*/
|
|
|
+ public boolean addNode( Block b,
|
|
|
+ DatanodeDescriptor node,
|
|
|
+ int replicationHint ) {
|
|
|
+ BlockInfo info = checkBlockInfo( b );
|
|
|
+ if ( info.nodes == null ) {
|
|
|
+ info.nodes = new DatanodeDescriptor[ replicationHint ];
|
|
|
+ }
|
|
|
+
|
|
|
+ DatanodeDescriptor[] arr = info.nodes;
|
|
|
+ for( int i=0; i < arr.length; i++ ) {
|
|
|
+ if ( arr[i] == null ) {
|
|
|
+ arr[i] = node;
|
|
|
+ return true;
|
|
|
+ }
|
|
|
+ if ( arr[i] == node ) {
|
|
|
+ return false;
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ /* Not enough space left. Create a new array. Should normally
|
|
|
+ * happen only when replication is manually increased by the user. */
|
|
|
+ info.nodes = new DatanodeDescriptor[ arr.length + 1 ];
|
|
|
+ for( int i=0; i < arr.length; i++ ) {
|
|
|
+ info.nodes[i] = arr[i];
|
|
|
+ }
|
|
|
+ info.nodes[ arr.length ] = node;
|
|
|
+ return true;
|
|
|
+ }
|
|
|
+
|
|
|
+ public boolean removeNode( Block b, DatanodeDescriptor node ) {
|
|
|
+ BlockInfo info = map.get( b );
|
|
|
+ if ( info == null || info.nodes == null )
|
|
|
+ return false;
|
|
|
+
|
|
|
+ boolean removed = false;
|
|
|
+ // swap lastNode and node's location. set lastNode to null.
|
|
|
+ DatanodeDescriptor[] arr = info.nodes;
|
|
|
+ int lastNode = -1;
|
|
|
+ for( int i=arr.length-1; i >= 0; i-- ) {
|
|
|
+ if ( lastNode < 0 && arr[i] != null )
|
|
|
+ lastNode = i;
|
|
|
+ if ( arr[i] == node ) {
|
|
|
+ arr[i] = arr[ lastNode ];
|
|
|
+ arr[ lastNode ] = null;
|
|
|
+ removed = true;
|
|
|
+ break;
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ /*
|
|
|
+ * if ( (lastNode + 1) < arr.length/4 ) {
|
|
|
+ * we could trim the array.
|
|
|
+ * }
|
|
|
+ */
|
|
|
+ if ( arr[0] == null ) { // no datanodes left.
|
|
|
+ info.nodes = null;
|
|
|
+ if ( info.inode == null ) {
|
|
|
+ map.remove( b );
|
|
|
+ }
|
|
|
+ }
|
|
|
+ return removed;
|
|
|
+ }
|
|
|
+ }
|
|
|
}
|