|
@@ -59,8 +59,8 @@ class FSNamesystem implements FSConstants {
|
|
|
// to client-sent information.
|
|
|
// Mapping: Block -> TreeSet<DatanodeDescriptor>
|
|
|
//
|
|
|
- Map<Block, SortedSet<DatanodeDescriptor>> blocksMap =
|
|
|
- new HashMap<Block, SortedSet<DatanodeDescriptor>>();
|
|
|
+ Map<Block, List<DatanodeDescriptor>> blocksMap =
|
|
|
+ new HashMap<Block, List<DatanodeDescriptor>>();
|
|
|
|
|
|
/**
|
|
|
* Stores the datanode -> block map.
|
|
@@ -179,6 +179,8 @@ class FSNamesystem implements FSConstants {
|
|
|
private int maxReplicationStreams;
|
|
|
// MIN_REPLICATION is how many copies we need in place or else we disallow the write
|
|
|
private int minReplication;
|
|
|
+ // Default replication
|
|
|
+ private int defaultReplication;
|
|
|
// heartbeatRecheckInterval is how often namenode checks for expired datanodes
|
|
|
private long heartbeatRecheckInterval;
|
|
|
// heartbeatExpireInterval is how long namenode waits for datanode to report
|
|
@@ -199,6 +201,7 @@ class FSNamesystem implements FSConstants {
|
|
|
int port,
|
|
|
NameNode nn, Configuration conf) throws IOException {
|
|
|
fsNamesystemObject = this;
|
|
|
+ this.defaultReplication = conf.getInt("dfs.replication", 3);
|
|
|
this.maxReplication = conf.getInt("dfs.replication.max", 512);
|
|
|
this.minReplication = conf.getInt("dfs.replication.min", 1);
|
|
|
if( minReplication <= 0 )
|
|
@@ -299,7 +302,7 @@ class FSNamesystem implements FSConstants {
|
|
|
DatanodeDescriptor machineSets[][] = new DatanodeDescriptor[blocks.length][];
|
|
|
|
|
|
for (int i = 0; i < blocks.length; i++) {
|
|
|
- SortedSet<DatanodeDescriptor> containingNodes = blocksMap.get(blocks[i]);
|
|
|
+ List<DatanodeDescriptor> containingNodes = blocksMap.get(blocks[i]);
|
|
|
if (containingNodes == null) {
|
|
|
machineSets[i] = new DatanodeDescriptor[0];
|
|
|
} else {
|
|
@@ -660,22 +663,16 @@ class FSNamesystem implements FSConstants {
|
|
|
//
|
|
|
// We have the pending blocks, but they won't have
|
|
|
// length info in them (as they were allocated before
|
|
|
- // data-write took place). So we need to add the correct
|
|
|
- // length info to each
|
|
|
- //
|
|
|
- // REMIND - mjc - this is very inefficient! We should
|
|
|
- // improve this!
|
|
|
+ // data-write took place). Find the block stored in
|
|
|
+ // node descriptor.
|
|
|
//
|
|
|
for (int i = 0; i < nrBlocks; i++) {
|
|
|
Block b = pendingBlocks[i];
|
|
|
- SortedSet<DatanodeDescriptor> containingNodes = blocksMap.get(b);
|
|
|
- DatanodeDescriptor node = containingNodes.first();
|
|
|
- for (Iterator<Block> it = node.getBlockIterator(); it.hasNext(); ) {
|
|
|
- Block cur = it.next();
|
|
|
- if (b.getBlockId() == cur.getBlockId()) {
|
|
|
- b.setNumBytes(cur.getNumBytes());
|
|
|
- break;
|
|
|
- }
|
|
|
+ List<DatanodeDescriptor> containingNodes = blocksMap.get(b);
|
|
|
+ Block storedBlock =
|
|
|
+ containingNodes.get(0).getBlock(b.getBlockId());
|
|
|
+ if ( storedBlock != null ) {
|
|
|
+ pendingBlocks[i] = storedBlock;
|
|
|
}
|
|
|
}
|
|
|
|
|
@@ -716,7 +713,7 @@ class FSNamesystem implements FSConstants {
|
|
|
// Now that the file is real, we need to be sure to replicate
|
|
|
// the blocks.
|
|
|
for (int i = 0; i < nrBlocks; i++) {
|
|
|
- SortedSet<DatanodeDescriptor> containingNodes = blocksMap.get(pendingBlocks[i]);
|
|
|
+ List<DatanodeDescriptor> containingNodes = blocksMap.get(pendingBlocks[i]);
|
|
|
// filter out containingNodes that are marked for decommission.
|
|
|
int numCurrentReplica = countContainingNodes(containingNodes);
|
|
|
|
|
@@ -761,7 +758,7 @@ class FSNamesystem implements FSConstants {
|
|
|
|
|
|
for (Iterator<Block> it = v.getBlocks().iterator(); it.hasNext(); ) {
|
|
|
Block b = it.next();
|
|
|
- SortedSet<DatanodeDescriptor> containingNodes = blocksMap.get(b);
|
|
|
+ List<DatanodeDescriptor> containingNodes = blocksMap.get(b);
|
|
|
if (containingNodes == null || containingNodes.size() < this.minReplication) {
|
|
|
return false;
|
|
|
}
|
|
@@ -806,7 +803,7 @@ class FSNamesystem implements FSConstants {
|
|
|
for (int i = 0; i < deletedBlocks.length; i++) {
|
|
|
Block b = deletedBlocks[i];
|
|
|
|
|
|
- SortedSet<DatanodeDescriptor> containingNodes = blocksMap.get(b);
|
|
|
+ List<DatanodeDescriptor> containingNodes = blocksMap.get(b);
|
|
|
if (containingNodes != null) {
|
|
|
for (Iterator<DatanodeDescriptor> it = containingNodes.iterator(); it.hasNext(); ) {
|
|
|
DatanodeDescriptor node = it.next();
|
|
@@ -935,7 +932,7 @@ class FSNamesystem implements FSConstants {
|
|
|
} else {
|
|
|
String hosts[][] = new String[(endBlock - startBlock) + 1][];
|
|
|
for (int i = startBlock; i <= endBlock; i++) {
|
|
|
- SortedSet<DatanodeDescriptor> containingNodes = blocksMap.get(blocks[i]);
|
|
|
+ List<DatanodeDescriptor> containingNodes = blocksMap.get(blocks[i]);
|
|
|
Collection<String> v = new ArrayList<String>();
|
|
|
if (containingNodes != null) {
|
|
|
for (Iterator<DatanodeDescriptor> it =containingNodes.iterator(); it.hasNext();) {
|
|
@@ -1494,12 +1491,16 @@ class FSNamesystem implements FSConstants {
|
|
|
// between the old and new block report.
|
|
|
//
|
|
|
int newPos = 0;
|
|
|
- boolean modified = false;
|
|
|
Iterator<Block> iter = node.getBlockIterator();
|
|
|
Block oldblk = iter.hasNext() ? iter.next() : null;
|
|
|
Block newblk = (newReport != null && newReport.length > 0) ?
|
|
|
newReport[0] : null;
|
|
|
|
|
|
+ // common case is that most of the blocks from the datanode
|
|
|
+ // matches blocks in datanode descriptor.
|
|
|
+ Collection<Block> toRemove = new LinkedList<Block>();
|
|
|
+ Collection<Block> toAdd = new LinkedList<Block>();
|
|
|
+
|
|
|
while (oldblk != null || newblk != null) {
|
|
|
|
|
|
int cmp = (oldblk == null) ? 1 :
|
|
@@ -1513,25 +1514,25 @@ class FSNamesystem implements FSConstants {
|
|
|
? newReport[newPos] : null;
|
|
|
} else if (cmp < 0) {
|
|
|
// The old report has a block the new one does not
|
|
|
+ toRemove.add(oldblk);
|
|
|
removeStoredBlock(oldblk, node);
|
|
|
- modified = true;
|
|
|
oldblk = iter.hasNext() ? iter.next() : null;
|
|
|
} else {
|
|
|
// The new report has a block the old one does not
|
|
|
- addStoredBlock(newblk, node);
|
|
|
- modified = true;
|
|
|
+ toAdd.add(addStoredBlock(newblk, node));
|
|
|
newPos++;
|
|
|
newblk = (newPos < newReport.length)
|
|
|
? newReport[newPos] : null;
|
|
|
}
|
|
|
}
|
|
|
- //
|
|
|
- // Modify node so it has the new blockreport
|
|
|
- //
|
|
|
- if (modified) {
|
|
|
- node.updateBlocks(newReport);
|
|
|
+
|
|
|
+ for ( Iterator<Block> i = toRemove.iterator(); i.hasNext(); ) {
|
|
|
+ node.removeBlock( i.next() );
|
|
|
}
|
|
|
-
|
|
|
+ for ( Iterator<Block> i = toAdd.iterator(); i.hasNext(); ) {
|
|
|
+ node.addBlock( i.next() );
|
|
|
+ }
|
|
|
+
|
|
|
//
|
|
|
// We've now completely updated the node's block report profile.
|
|
|
// We now go through all its blocks and find which ones are invalid,
|
|
@@ -1560,12 +1561,25 @@ class FSNamesystem implements FSConstants {
|
|
|
/**
|
|
|
* Modify (block-->datanode) map. Remove block from set of
|
|
|
* needed replications if this takes care of the problem.
|
|
|
+ * @return the block that is stored in blockMap.
|
|
|
*/
|
|
|
- synchronized void addStoredBlock(Block block, DatanodeDescriptor node) {
|
|
|
- SortedSet<DatanodeDescriptor> containingNodes = blocksMap.get(block);
|
|
|
+ synchronized Block addStoredBlock(Block block, DatanodeDescriptor node) {
|
|
|
+ List<DatanodeDescriptor> containingNodes = blocksMap.get(block);
|
|
|
if (containingNodes == null) {
|
|
|
- containingNodes = new TreeSet<DatanodeDescriptor>();
|
|
|
+ //Create an arraylist with the current replication factor
|
|
|
+ FSDirectory.INode inode = dir.getFileByBlock(block);
|
|
|
+ int replication = (inode != null) ?
|
|
|
+ inode.getReplication() : defaultReplication;
|
|
|
+ containingNodes = new ArrayList<DatanodeDescriptor>(replication);
|
|
|
blocksMap.put(block, containingNodes);
|
|
|
+ } else {
|
|
|
+ Block storedBlock =
|
|
|
+ containingNodes.get(0).getBlock(block.getBlockId());
|
|
|
+ // update stored block's length.
|
|
|
+ if ( block.getNumBytes() > 0 ) {
|
|
|
+ storedBlock.setNumBytes( block.getNumBytes() );
|
|
|
+ }
|
|
|
+ block = storedBlock;
|
|
|
}
|
|
|
if (! containingNodes.contains(node)) {
|
|
|
containingNodes.add(node);
|
|
@@ -1587,7 +1601,7 @@ class FSNamesystem implements FSConstants {
|
|
|
synchronized (neededReplications) {
|
|
|
FSDirectory.INode fileINode = dir.getFileByBlock(block);
|
|
|
if( fileINode == null ) // block does not belong to any file
|
|
|
- return;
|
|
|
+ return block;
|
|
|
|
|
|
// filter out containingNodes that are marked for decommission.
|
|
|
int numCurrentReplica = countContainingNodes(containingNodes);
|
|
@@ -1612,6 +1626,7 @@ class FSNamesystem implements FSConstants {
|
|
|
|
|
|
proccessOverReplicatedBlock( block, fileReplication );
|
|
|
}
|
|
|
+ return block;
|
|
|
}
|
|
|
|
|
|
/**
|
|
@@ -1620,7 +1635,7 @@ class FSNamesystem implements FSConstants {
|
|
|
* mark them in the excessReplicateMap.
|
|
|
*/
|
|
|
private void proccessOverReplicatedBlock( Block block, short replication ) {
|
|
|
- SortedSet<DatanodeDescriptor> containingNodes = blocksMap.get(block);
|
|
|
+ List<DatanodeDescriptor> containingNodes = blocksMap.get(block);
|
|
|
if( containingNodes == null )
|
|
|
return;
|
|
|
Collection<DatanodeDescriptor> nonExcess = new ArrayList<DatanodeDescriptor>();
|
|
@@ -1700,7 +1715,7 @@ class FSNamesystem implements FSConstants {
|
|
|
synchronized void removeStoredBlock(Block block, DatanodeDescriptor node) {
|
|
|
NameNode.stateChangeLog.debug("BLOCK* NameSystem.removeStoredBlock: "
|
|
|
+block.getBlockName() + " from "+node.getName() );
|
|
|
- SortedSet<DatanodeDescriptor> containingNodes = blocksMap.get(block);
|
|
|
+ List<DatanodeDescriptor> containingNodes = blocksMap.get(block);
|
|
|
if (containingNodes == null || ! containingNodes.contains(node)) {
|
|
|
NameNode.stateChangeLog.debug("BLOCK* NameSystem.removeStoredBlock: "
|
|
|
+block.getBlockName()+" has already been removed from node "+node );
|
|
@@ -1759,14 +1774,9 @@ class FSNamesystem implements FSConstants {
|
|
|
NameNode.stateChangeLog.debug("BLOCK* NameSystem.blockReceived: "
|
|
|
+block.getBlockName()+" is received from " + nodeID.getName() );
|
|
|
//
|
|
|
- // Modify the blocks->datanode map
|
|
|
+ // Modify the blocks->datanode map and node's map.
|
|
|
//
|
|
|
- addStoredBlock(block, node);
|
|
|
-
|
|
|
- //
|
|
|
- // Supplement node's blockreport
|
|
|
- //
|
|
|
- node.addBlock(block);
|
|
|
+ node.addBlock( addStoredBlock(block, node) );
|
|
|
}
|
|
|
|
|
|
/**
|