|
@@ -59,8 +59,8 @@ class FSNamesystem implements FSConstants {
|
|
|
// to client-sent information.
|
|
|
// Mapping: Block -> TreeSet<DatanodeDescriptor>
|
|
|
//
|
|
|
- Map<Block, List<DatanodeDescriptor>> blocksMap =
|
|
|
- new HashMap<Block, List<DatanodeDescriptor>>();
|
|
|
+ Map<Block, SortedSet<DatanodeDescriptor>> blocksMap =
|
|
|
+ new HashMap<Block, SortedSet<DatanodeDescriptor>>();
|
|
|
|
|
|
/**
|
|
|
* Stores the datanode -> block map.
|
|
@@ -179,8 +179,6 @@ class FSNamesystem implements FSConstants {
|
|
|
private int maxReplicationStreams;
|
|
|
// MIN_REPLICATION is how many copies we need in place or else we disallow the write
|
|
|
private int minReplication;
|
|
|
- // Default replication
|
|
|
- private int defaultReplication;
|
|
|
// heartbeatRecheckInterval is how often namenode checks for expired datanodes
|
|
|
private long heartbeatRecheckInterval;
|
|
|
// heartbeatExpireInterval is how long namenode waits for datanode to report
|
|
@@ -201,7 +199,6 @@ class FSNamesystem implements FSConstants {
|
|
|
int port,
|
|
|
NameNode nn, Configuration conf) throws IOException {
|
|
|
fsNamesystemObject = this;
|
|
|
- this.defaultReplication = conf.getInt("dfs.replication", 3);
|
|
|
this.maxReplication = conf.getInt("dfs.replication.max", 512);
|
|
|
this.minReplication = conf.getInt("dfs.replication.min", 1);
|
|
|
if( minReplication <= 0 )
|
|
@@ -302,7 +299,7 @@ class FSNamesystem implements FSConstants {
|
|
|
DatanodeDescriptor machineSets[][] = new DatanodeDescriptor[blocks.length][];
|
|
|
|
|
|
for (int i = 0; i < blocks.length; i++) {
|
|
|
- List<DatanodeDescriptor> containingNodes = blocksMap.get(blocks[i]);
|
|
|
+ SortedSet<DatanodeDescriptor> containingNodes = blocksMap.get(blocks[i]);
|
|
|
if (containingNodes == null) {
|
|
|
machineSets[i] = new DatanodeDescriptor[0];
|
|
|
} else {
|
|
@@ -663,16 +660,22 @@ class FSNamesystem implements FSConstants {
|
|
|
//
|
|
|
// We have the pending blocks, but they won't have
|
|
|
// length info in them (as they were allocated before
|
|
|
- // data-write took place). Find the block stored in
|
|
|
- // node descriptor.
|
|
|
+ // data-write took place). So we need to add the correct
|
|
|
+ // length info to each
|
|
|
+ //
|
|
|
+ // REMIND - mjc - this is very inefficient! We should
|
|
|
+ // improve this!
|
|
|
//
|
|
|
for (int i = 0; i < nrBlocks; i++) {
|
|
|
Block b = pendingBlocks[i];
|
|
|
- List<DatanodeDescriptor> containingNodes = blocksMap.get(b);
|
|
|
- Block storedBlock =
|
|
|
- containingNodes.get(0).getBlock(b.getBlockId());
|
|
|
- if ( storedBlock != null ) {
|
|
|
- pendingBlocks[i] = storedBlock;
|
|
|
+ SortedSet<DatanodeDescriptor> containingNodes = blocksMap.get(b);
|
|
|
+ DatanodeDescriptor node = containingNodes.first();
|
|
|
+ for (Iterator<Block> it = node.getBlockIterator(); it.hasNext(); ) {
|
|
|
+ Block cur = it.next();
|
|
|
+ if (b.getBlockId() == cur.getBlockId()) {
|
|
|
+ b.setNumBytes(cur.getNumBytes());
|
|
|
+ break;
|
|
|
+ }
|
|
|
}
|
|
|
}
|
|
|
|
|
@@ -713,7 +716,7 @@ class FSNamesystem implements FSConstants {
|
|
|
// Now that the file is real, we need to be sure to replicate
|
|
|
// the blocks.
|
|
|
for (int i = 0; i < nrBlocks; i++) {
|
|
|
- List<DatanodeDescriptor> containingNodes = blocksMap.get(pendingBlocks[i]);
|
|
|
+ SortedSet<DatanodeDescriptor> containingNodes = blocksMap.get(pendingBlocks[i]);
|
|
|
// filter out containingNodes that are marked for decommission.
|
|
|
int numCurrentReplica = countContainingNodes(containingNodes);
|
|
|
|
|
@@ -758,7 +761,7 @@ class FSNamesystem implements FSConstants {
|
|
|
|
|
|
for (Iterator<Block> it = v.getBlocks().iterator(); it.hasNext(); ) {
|
|
|
Block b = it.next();
|
|
|
- List<DatanodeDescriptor> containingNodes = blocksMap.get(b);
|
|
|
+ SortedSet<DatanodeDescriptor> containingNodes = blocksMap.get(b);
|
|
|
if (containingNodes == null || containingNodes.size() < this.minReplication) {
|
|
|
return false;
|
|
|
}
|
|
@@ -791,7 +794,7 @@ class FSNamesystem implements FSConstants {
|
|
|
throw new SafeModeException("Cannot invalidate block " + blk.getBlockName(), safeMode);
|
|
|
}
|
|
|
|
|
|
- List<DatanodeDescriptor> containingNodes = blocksMap.get(blk);
|
|
|
+ Collection<DatanodeDescriptor> containingNodes = blocksMap.get(blk);
|
|
|
|
|
|
// Check how many copies we have of the block. If we have at least one
|
|
|
// copy on a live node, then we can delete it.
|
|
@@ -849,7 +852,7 @@ class FSNamesystem implements FSConstants {
|
|
|
for (int i = 0; i < deletedBlocks.length; i++) {
|
|
|
Block b = deletedBlocks[i];
|
|
|
|
|
|
- List<DatanodeDescriptor> containingNodes = blocksMap.get(b);
|
|
|
+ SortedSet<DatanodeDescriptor> containingNodes = blocksMap.get(b);
|
|
|
if (containingNodes != null) {
|
|
|
for (Iterator<DatanodeDescriptor> it = containingNodes.iterator(); it.hasNext(); ) {
|
|
|
DatanodeDescriptor node = it.next();
|
|
@@ -973,7 +976,7 @@ class FSNamesystem implements FSConstants {
|
|
|
} else {
|
|
|
String hosts[][] = new String[(endBlock - startBlock) + 1][];
|
|
|
for (int i = startBlock; i <= endBlock; i++) {
|
|
|
- List<DatanodeDescriptor> containingNodes = blocksMap.get(blocks[i]);
|
|
|
+ SortedSet<DatanodeDescriptor> containingNodes = blocksMap.get(blocks[i]);
|
|
|
Collection<String> v = new ArrayList<String>();
|
|
|
if (containingNodes != null) {
|
|
|
for (Iterator<DatanodeDescriptor> it =containingNodes.iterator(); it.hasNext();) {
|
|
@@ -1532,16 +1535,12 @@ class FSNamesystem implements FSConstants {
|
|
|
// between the old and new block report.
|
|
|
//
|
|
|
int newPos = 0;
|
|
|
+ boolean modified = false;
|
|
|
Iterator<Block> iter = node.getBlockIterator();
|
|
|
Block oldblk = iter.hasNext() ? iter.next() : null;
|
|
|
Block newblk = (newReport != null && newReport.length > 0) ?
|
|
|
newReport[0] : null;
|
|
|
|
|
|
- // common case is that most of the blocks from the datanode
|
|
|
- // matches blocks in datanode descriptor.
|
|
|
- Collection<Block> toRemove = new LinkedList<Block>();
|
|
|
- Collection<Block> toAdd = new LinkedList<Block>();
|
|
|
-
|
|
|
while (oldblk != null || newblk != null) {
|
|
|
|
|
|
int cmp = (oldblk == null) ? 1 :
|
|
@@ -1555,25 +1554,25 @@ class FSNamesystem implements FSConstants {
|
|
|
? newReport[newPos] : null;
|
|
|
} else if (cmp < 0) {
|
|
|
// The old report has a block the new one does not
|
|
|
- toRemove.add(oldblk);
|
|
|
removeStoredBlock(oldblk, node);
|
|
|
+ modified = true;
|
|
|
oldblk = iter.hasNext() ? iter.next() : null;
|
|
|
} else {
|
|
|
// The new report has a block the old one does not
|
|
|
- toAdd.add(addStoredBlock(newblk, node));
|
|
|
+ addStoredBlock(newblk, node);
|
|
|
+ modified = true;
|
|
|
newPos++;
|
|
|
newblk = (newPos < newReport.length)
|
|
|
? newReport[newPos] : null;
|
|
|
}
|
|
|
}
|
|
|
-
|
|
|
- for ( Iterator<Block> i = toRemove.iterator(); i.hasNext(); ) {
|
|
|
- node.removeBlock( i.next() );
|
|
|
- }
|
|
|
- for ( Iterator<Block> i = toAdd.iterator(); i.hasNext(); ) {
|
|
|
- node.addBlock( i.next() );
|
|
|
+ //
|
|
|
+ // Modify node so it has the new blockreport
|
|
|
+ //
|
|
|
+ if (modified) {
|
|
|
+ node.updateBlocks(newReport);
|
|
|
}
|
|
|
-
|
|
|
+
|
|
|
//
|
|
|
// We've now completely updated the node's block report profile.
|
|
|
// We now go through all its blocks and find which ones are invalid,
|
|
@@ -1602,25 +1601,12 @@ class FSNamesystem implements FSConstants {
|
|
|
/**
|
|
|
* Modify (block-->datanode) map. Remove block from set of
|
|
|
* needed replications if this takes care of the problem.
|
|
|
- * @return the block that is stored in blockMap.
|
|
|
*/
|
|
|
- synchronized Block addStoredBlock(Block block, DatanodeDescriptor node) {
|
|
|
- List<DatanodeDescriptor> containingNodes = blocksMap.get(block);
|
|
|
+ synchronized void addStoredBlock(Block block, DatanodeDescriptor node) {
|
|
|
+ SortedSet<DatanodeDescriptor> containingNodes = blocksMap.get(block);
|
|
|
if (containingNodes == null) {
|
|
|
- //Create an arraylist with the current replication factor
|
|
|
- FSDirectory.INode inode = dir.getFileByBlock(block);
|
|
|
- int replication = (inode != null) ?
|
|
|
- inode.getReplication() : defaultReplication;
|
|
|
- containingNodes = new ArrayList<DatanodeDescriptor>(replication);
|
|
|
+ containingNodes = new TreeSet<DatanodeDescriptor>();
|
|
|
blocksMap.put(block, containingNodes);
|
|
|
- } else {
|
|
|
- Block storedBlock =
|
|
|
- containingNodes.get(0).getBlock(block.getBlockId());
|
|
|
- // update stored block's length.
|
|
|
- if ( block.getNumBytes() > 0 ) {
|
|
|
- storedBlock.setNumBytes( block.getNumBytes() );
|
|
|
- }
|
|
|
- block = storedBlock;
|
|
|
}
|
|
|
if (! containingNodes.contains(node)) {
|
|
|
containingNodes.add(node);
|
|
@@ -1642,7 +1628,7 @@ class FSNamesystem implements FSConstants {
|
|
|
synchronized (neededReplications) {
|
|
|
FSDirectory.INode fileINode = dir.getFileByBlock(block);
|
|
|
if( fileINode == null ) // block does not belong to any file
|
|
|
- return block;
|
|
|
+ return;
|
|
|
|
|
|
// filter out containingNodes that are marked for decommission.
|
|
|
int numCurrentReplica = countContainingNodes(containingNodes);
|
|
@@ -1667,7 +1653,6 @@ class FSNamesystem implements FSConstants {
|
|
|
|
|
|
proccessOverReplicatedBlock( block, fileReplication );
|
|
|
}
|
|
|
- return block;
|
|
|
}
|
|
|
|
|
|
/**
|
|
@@ -1676,7 +1661,7 @@ class FSNamesystem implements FSConstants {
|
|
|
* mark them in the excessReplicateMap.
|
|
|
*/
|
|
|
private void proccessOverReplicatedBlock( Block block, short replication ) {
|
|
|
- List<DatanodeDescriptor> containingNodes = blocksMap.get(block);
|
|
|
+ SortedSet<DatanodeDescriptor> containingNodes = blocksMap.get(block);
|
|
|
if( containingNodes == null )
|
|
|
return;
|
|
|
Collection<DatanodeDescriptor> nonExcess = new ArrayList<DatanodeDescriptor>();
|
|
@@ -1756,7 +1741,7 @@ class FSNamesystem implements FSConstants {
|
|
|
synchronized void removeStoredBlock(Block block, DatanodeDescriptor node) {
|
|
|
NameNode.stateChangeLog.debug("BLOCK* NameSystem.removeStoredBlock: "
|
|
|
+block.getBlockName() + " from "+node.getName() );
|
|
|
- List<DatanodeDescriptor> containingNodes = blocksMap.get(block);
|
|
|
+ SortedSet<DatanodeDescriptor> containingNodes = blocksMap.get(block);
|
|
|
if (containingNodes == null || ! containingNodes.contains(node)) {
|
|
|
NameNode.stateChangeLog.debug("BLOCK* NameSystem.removeStoredBlock: "
|
|
|
+block.getBlockName()+" has already been removed from node "+node );
|
|
@@ -1815,9 +1800,14 @@ class FSNamesystem implements FSConstants {
|
|
|
NameNode.stateChangeLog.debug("BLOCK* NameSystem.blockReceived: "
|
|
|
+block.getBlockName()+" is received from " + nodeID.getName() );
|
|
|
//
|
|
|
- // Modify the blocks->datanode map and node's map.
|
|
|
+ // Modify the blocks->datanode map
|
|
|
//
|
|
|
- node.addBlock( addStoredBlock(block, node) );
|
|
|
+ addStoredBlock(block, node);
|
|
|
+
|
|
|
+ //
|
|
|
+ // Supplement node's blockreport
|
|
|
+ //
|
|
|
+ node.addBlock(block);
|
|
|
}
|
|
|
|
|
|
/**
|