|
@@ -61,8 +61,8 @@ class FSNamesystem implements FSConstants {
|
|
|
// to client-sent information.
|
|
|
// Mapping: Block -> TreeSet<DatanodeDescriptor>
|
|
|
//
|
|
|
- Map<Block, SortedSet<DatanodeDescriptor>> blocksMap =
|
|
|
- new HashMap<Block, SortedSet<DatanodeDescriptor>>();
|
|
|
+ Map<Block, List<DatanodeDescriptor>> blocksMap =
|
|
|
+ new HashMap<Block, List<DatanodeDescriptor>>();
|
|
|
|
|
|
/**
|
|
|
* Stores the datanode -> block map.
|
|
@@ -182,6 +182,8 @@ class FSNamesystem implements FSConstants {
|
|
|
private int maxReplicationStreams;
|
|
|
// MIN_REPLICATION is how many copies we need in place or else we disallow the write
|
|
|
private int minReplication;
|
|
|
+ // Default replication
|
|
|
+ private int defaultReplication;
|
|
|
// heartbeatRecheckInterval is how often namenode checks for expired datanodes
|
|
|
private long heartbeatRecheckInterval;
|
|
|
// heartbeatExpireInterval is how long namenode waits for datanode to report
|
|
@@ -211,6 +213,7 @@ class FSNamesystem implements FSConstants {
|
|
|
int port,
|
|
|
NameNode nn, Configuration conf) throws IOException {
|
|
|
fsNamesystemObject = this;
|
|
|
+ this.defaultReplication = conf.getInt("dfs.replication", 3);
|
|
|
this.maxReplication = conf.getInt("dfs.replication.max", 512);
|
|
|
this.minReplication = conf.getInt("dfs.replication.min", 1);
|
|
|
if( minReplication <= 0 )
|
|
@@ -524,7 +527,7 @@ class FSNamesystem implements FSConstants {
|
|
|
DatanodeDescriptor machineSets[][] = new DatanodeDescriptor[blocks.length][];
|
|
|
|
|
|
for (int i = 0; i < blocks.length; i++) {
|
|
|
- SortedSet<DatanodeDescriptor> containingNodes = blocksMap.get(blocks[i]);
|
|
|
+ Collection<DatanodeDescriptor> containingNodes = blocksMap.get(blocks[i]);
|
|
|
if (containingNodes == null) {
|
|
|
machineSets[i] = new DatanodeDescriptor[0];
|
|
|
} else {
|
|
@@ -889,22 +892,16 @@ class FSNamesystem implements FSConstants {
|
|
|
//
|
|
|
// We have the pending blocks, but they won't have
|
|
|
// length info in them (as they were allocated before
|
|
|
- // data-write took place). So we need to add the correct
|
|
|
- // length info to each
|
|
|
- //
|
|
|
- // REMIND - mjc - this is very inefficient! We should
|
|
|
- // improve this!
|
|
|
+ // data-write took place). Find the block stored in
|
|
|
+ // node descriptor.
|
|
|
//
|
|
|
for (int i = 0; i < nrBlocks; i++) {
|
|
|
Block b = pendingBlocks[i];
|
|
|
- SortedSet<DatanodeDescriptor> containingNodes = blocksMap.get(b);
|
|
|
- DatanodeDescriptor node = containingNodes.first();
|
|
|
- for (Iterator<Block> it = node.getBlockIterator(); it.hasNext(); ) {
|
|
|
- Block cur = it.next();
|
|
|
- if (b.getBlockId() == cur.getBlockId()) {
|
|
|
- b.setNumBytes(cur.getNumBytes());
|
|
|
- break;
|
|
|
- }
|
|
|
+ List<DatanodeDescriptor> containingNodes = blocksMap.get(b);
|
|
|
+ Block storedBlock =
|
|
|
+ containingNodes.get(0).getBlock(b);
|
|
|
+ if ( storedBlock != null ) {
|
|
|
+ pendingBlocks[i] = storedBlock;
|
|
|
}
|
|
|
}
|
|
|
|
|
@@ -946,7 +943,7 @@ class FSNamesystem implements FSConstants {
|
|
|
// the blocks.
|
|
|
int numExpectedReplicas = pendingFile.getReplication();
|
|
|
for (int i = 0; i < nrBlocks; i++) {
|
|
|
- SortedSet<DatanodeDescriptor> containingNodes = blocksMap.get(pendingBlocks[i]);
|
|
|
+ Collection<DatanodeDescriptor> containingNodes = blocksMap.get(pendingBlocks[i]);
|
|
|
// filter out containingNodes that are marked for decommission.
|
|
|
int numCurrentReplica = countContainingNodes(containingNodes);
|
|
|
|
|
@@ -986,7 +983,7 @@ class FSNamesystem implements FSConstants {
|
|
|
|
|
|
for (Iterator<Block> it = v.getBlocks().iterator(); it.hasNext(); ) {
|
|
|
Block b = it.next();
|
|
|
- SortedSet<DatanodeDescriptor> containingNodes = blocksMap.get(b);
|
|
|
+ Collection<DatanodeDescriptor> containingNodes = blocksMap.get(b);
|
|
|
if (containingNodes == null || containingNodes.size() < this.minReplication) {
|
|
|
return false;
|
|
|
}
|
|
@@ -1077,7 +1074,7 @@ class FSNamesystem implements FSConstants {
|
|
|
for (int i = 0; i < deletedBlocks.length; i++) {
|
|
|
Block b = deletedBlocks[i];
|
|
|
|
|
|
- SortedSet<DatanodeDescriptor> containingNodes = blocksMap.get(b);
|
|
|
+ Collection<DatanodeDescriptor> containingNodes = blocksMap.get(b);
|
|
|
if (containingNodes != null) {
|
|
|
for (Iterator<DatanodeDescriptor> it = containingNodes.iterator(); it.hasNext(); ) {
|
|
|
DatanodeDescriptor node = it.next();
|
|
@@ -1201,7 +1198,7 @@ class FSNamesystem implements FSConstants {
|
|
|
} else {
|
|
|
String hosts[][] = new String[(endBlock - startBlock) + 1][];
|
|
|
for (int i = startBlock; i <= endBlock; i++) {
|
|
|
- SortedSet<DatanodeDescriptor> containingNodes = blocksMap.get(blocks[i]);
|
|
|
+ Collection<DatanodeDescriptor> containingNodes = blocksMap.get(blocks[i]);
|
|
|
Collection<String> v = new ArrayList<String>();
|
|
|
if (containingNodes != null) {
|
|
|
for (Iterator<DatanodeDescriptor> it =containingNodes.iterator(); it.hasNext();) {
|
|
@@ -1924,12 +1921,16 @@ class FSNamesystem implements FSConstants {
|
|
|
// between the old and new block report.
|
|
|
//
|
|
|
int newPos = 0;
|
|
|
- boolean modified = false;
|
|
|
Iterator<Block> iter = node.getBlockIterator();
|
|
|
Block oldblk = iter.hasNext() ? iter.next() : null;
|
|
|
Block newblk = (newReport != null && newReport.length > 0) ?
|
|
|
newReport[0] : null;
|
|
|
|
|
|
+ // common case is that most of the blocks from the datanode
|
|
|
+ // matches blocks in datanode descriptor.
|
|
|
+ Collection<Block> toRemove = new LinkedList<Block>();
|
|
|
+ Collection<Block> toAdd = new LinkedList<Block>();
|
|
|
+
|
|
|
while (oldblk != null || newblk != null) {
|
|
|
|
|
|
int cmp = (oldblk == null) ? 1 :
|
|
@@ -1943,25 +1944,27 @@ class FSNamesystem implements FSConstants {
|
|
|
? newReport[newPos] : null;
|
|
|
} else if (cmp < 0) {
|
|
|
// The old report has a block the new one does not
|
|
|
- removeStoredBlock(oldblk, node);
|
|
|
- modified = true;
|
|
|
+ toRemove.add(oldblk);
|
|
|
oldblk = iter.hasNext() ? iter.next() : null;
|
|
|
} else {
|
|
|
// The new report has a block the old one does not
|
|
|
- addStoredBlock(newblk, node);
|
|
|
- modified = true;
|
|
|
+ toAdd.add(newblk);
|
|
|
newPos++;
|
|
|
newblk = (newPos < newReport.length)
|
|
|
? newReport[newPos] : null;
|
|
|
}
|
|
|
}
|
|
|
- //
|
|
|
- // Modify node so it has the new blockreport
|
|
|
- //
|
|
|
- if (modified) {
|
|
|
- node.updateBlocks(newReport);
|
|
|
+
|
|
|
+ for ( Iterator<Block> i = toRemove.iterator(); i.hasNext(); ) {
|
|
|
+ Block b = i.next();
|
|
|
+ removeStoredBlock( b, node );
|
|
|
+ node.removeBlock( b );
|
|
|
}
|
|
|
-
|
|
|
+ for ( Iterator<Block> i = toAdd.iterator(); i.hasNext(); ) {
|
|
|
+ Block b = i.next();
|
|
|
+ node.addBlock( addStoredBlock(b, node) );
|
|
|
+ }
|
|
|
+
|
|
|
//
|
|
|
// We've now completely updated the node's block report profile.
|
|
|
// We now go through all its blocks and find which ones are invalid,
|
|
@@ -1990,12 +1993,27 @@ class FSNamesystem implements FSConstants {
|
|
|
/**
|
|
|
* Modify (block-->datanode) map. Remove block from set of
|
|
|
* needed replications if this takes care of the problem.
|
|
|
+ * @return the block that is stored in blockMap.
|
|
|
*/
|
|
|
- synchronized void addStoredBlock(Block block, DatanodeDescriptor node) {
|
|
|
- SortedSet<DatanodeDescriptor> containingNodes = blocksMap.get(block);
|
|
|
+ synchronized Block addStoredBlock(Block block, DatanodeDescriptor node) {
|
|
|
+ List<DatanodeDescriptor> containingNodes = blocksMap.get(block);
|
|
|
if (containingNodes == null) {
|
|
|
- containingNodes = new TreeSet<DatanodeDescriptor>();
|
|
|
+ //Create an arraylist with the current replication factor
|
|
|
+ FSDirectory.INode inode = dir.getFileByBlock(block);
|
|
|
+ int replication = (inode != null) ?
|
|
|
+ inode.getReplication() : defaultReplication;
|
|
|
+ containingNodes = new ArrayList<DatanodeDescriptor>(replication);
|
|
|
blocksMap.put(block, containingNodes);
|
|
|
+ } else {
|
|
|
+ Block storedBlock =
|
|
|
+ containingNodes.get(0).getBlock(block);
|
|
|
+ // update stored block's length.
|
|
|
+ if ( storedBlock != null ) {
|
|
|
+ if ( block.getNumBytes() > 0 ) {
|
|
|
+ storedBlock.setNumBytes( block.getNumBytes() );
|
|
|
+ }
|
|
|
+ block = storedBlock;
|
|
|
+ }
|
|
|
}
|
|
|
int curReplicaDelta = 0;
|
|
|
if (! containingNodes.contains(node)) {
|
|
@@ -2018,7 +2036,7 @@ class FSNamesystem implements FSConstants {
|
|
|
|
|
|
FSDirectory.INode fileINode = dir.getFileByBlock(block);
|
|
|
if( fileINode == null ) // block does not belong to any file
|
|
|
- return;
|
|
|
+ return block;
|
|
|
|
|
|
// filter out containingNodes that are marked for decommission.
|
|
|
int numCurrentReplica = countContainingNodes(containingNodes);
|
|
@@ -2036,6 +2054,7 @@ class FSNamesystem implements FSConstants {
|
|
|
pendingReplications.remove(block);
|
|
|
}
|
|
|
proccessOverReplicatedBlock( block, fileReplication );
|
|
|
+ return block;
|
|
|
}
|
|
|
|
|
|
/**
|
|
@@ -2044,7 +2063,7 @@ class FSNamesystem implements FSConstants {
|
|
|
* mark them in the excessReplicateMap.
|
|
|
*/
|
|
|
private void proccessOverReplicatedBlock( Block block, short replication ) {
|
|
|
- SortedSet<DatanodeDescriptor> containingNodes = blocksMap.get(block);
|
|
|
+ Collection<DatanodeDescriptor> containingNodes = blocksMap.get(block);
|
|
|
if( containingNodes == null )
|
|
|
return;
|
|
|
Collection<DatanodeDescriptor> nonExcess = new ArrayList<DatanodeDescriptor>();
|
|
@@ -2124,7 +2143,7 @@ class FSNamesystem implements FSConstants {
|
|
|
synchronized void removeStoredBlock(Block block, DatanodeDescriptor node) {
|
|
|
NameNode.stateChangeLog.debug("BLOCK* NameSystem.removeStoredBlock: "
|
|
|
+block.getBlockName() + " from "+node.getName() );
|
|
|
- SortedSet<DatanodeDescriptor> containingNodes = blocksMap.get(block);
|
|
|
+ Collection<DatanodeDescriptor> containingNodes = blocksMap.get(block);
|
|
|
if (containingNodes == null || ! containingNodes.contains(node)) {
|
|
|
NameNode.stateChangeLog.debug("BLOCK* NameSystem.removeStoredBlock: "
|
|
|
+block.getBlockName()+" has already been removed from node "+node );
|
|
@@ -2182,14 +2201,9 @@ class FSNamesystem implements FSConstants {
|
|
|
NameNode.stateChangeLog.debug("BLOCK* NameSystem.blockReceived: "
|
|
|
+block.getBlockName()+" is received from " + nodeID.getName() );
|
|
|
//
|
|
|
- // Modify the blocks->datanode map
|
|
|
+ // Modify the blocks->datanode map and node's map.
|
|
|
//
|
|
|
- addStoredBlock(block, node);
|
|
|
-
|
|
|
- //
|
|
|
- // Supplement node's blockreport
|
|
|
- //
|
|
|
- node.addBlock(block);
|
|
|
+ node.addBlock( addStoredBlock(block, node) );
|
|
|
}
|
|
|
|
|
|
/**
|