|
@@ -233,7 +233,7 @@ public class BlockManager {
|
|
/**
|
|
/**
|
|
* Get all valid locations of the block
|
|
* Get all valid locations of the block
|
|
*/
|
|
*/
|
|
- ArrayList<String> addBlock(Block block) {
|
|
|
|
|
|
+ ArrayList<String> getValidLocations(Block block) {
|
|
ArrayList<String> machineSet =
|
|
ArrayList<String> machineSet =
|
|
new ArrayList<String>(blocksMap.numNodes(block));
|
|
new ArrayList<String>(blocksMap.numNodes(block));
|
|
for(Iterator<DatanodeDescriptor> it =
|
|
for(Iterator<DatanodeDescriptor> it =
|
|
@@ -248,7 +248,6 @@ public class BlockManager {
|
|
return machineSet;
|
|
return machineSet;
|
|
}
|
|
}
|
|
|
|
|
|
-
|
|
|
|
List<LocatedBlock> getBlockLocations(Block[] blocks, long offset,
|
|
List<LocatedBlock> getBlockLocations(Block[] blocks, long offset,
|
|
long length, int nrBlocksToReturn) throws IOException {
|
|
long length, int nrBlocksToReturn) throws IOException {
|
|
int curBlk = 0;
|
|
int curBlk = 0;
|
|
@@ -396,43 +395,50 @@ public class BlockManager {
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
|
|
- void markBlockAsCorrupt(Block blk, DatanodeInfo dn) throws IOException {
|
|
|
|
|
|
+ void findAndMarkBlockAsCorrupt(Block blk,
|
|
|
|
+ DatanodeInfo dn) throws IOException {
|
|
|
|
+ BlockInfo storedBlock = getStoredBlock(blk);
|
|
|
|
+ if (storedBlock == null) {
|
|
|
|
+ // Check if the replica is in the blockMap, if not
|
|
|
|
+ // ignore the request for now. This could happen when BlockScanner
|
|
|
|
+ // thread of Datanode reports bad block before Block reports are sent
|
|
|
|
+ // by the Datanode on startup
|
|
|
|
+ NameNode.stateChangeLog.info("BLOCK* NameSystem.markBlockAsCorrupt: " +
|
|
|
|
+ "block " + blk + " could not be marked as " +
|
|
|
|
+ "corrupt as it does not exist in blocksMap");
|
|
|
|
+ return;
|
|
|
|
+ }
|
|
|
|
+ markBlockAsCorrupt(storedBlock, dn);
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ private void markBlockAsCorrupt(BlockInfo storedBlock,
|
|
|
|
+ DatanodeInfo dn) throws IOException {
|
|
|
|
+ assert storedBlock != null : "storedBlock should not be null";
|
|
DatanodeDescriptor node = namesystem.getDatanode(dn);
|
|
DatanodeDescriptor node = namesystem.getDatanode(dn);
|
|
if (node == null) {
|
|
if (node == null) {
|
|
- throw new IOException("Cannot mark block" + blk.getBlockName() +
|
|
|
|
|
|
+ throw new IOException("Cannot mark block " +
|
|
|
|
+ storedBlock.getBlockName() +
|
|
" as corrupt because datanode " + dn.getName() +
|
|
" as corrupt because datanode " + dn.getName() +
|
|
" does not exist. ");
|
|
" does not exist. ");
|
|
}
|
|
}
|
|
|
|
|
|
- final BlockInfo storedBlockInfo = blocksMap.getStoredBlock(blk);
|
|
|
|
- if (storedBlockInfo == null) {
|
|
|
|
- // Check if the replica is in the blockMap, if not
|
|
|
|
- // ignore the request for now. This could happen when BlockScanner
|
|
|
|
- // thread of Datanode reports bad block before Block reports are sent
|
|
|
|
- // by the Datanode on startup
|
|
|
|
|
|
+ INodeFile inode = storedBlock.getINode();
|
|
|
|
+ if (inode == null) {
|
|
NameNode.stateChangeLog.info("BLOCK NameSystem.markBlockAsCorrupt: " +
|
|
NameNode.stateChangeLog.info("BLOCK NameSystem.markBlockAsCorrupt: " +
|
|
- "block " + blk + " could not be marked " +
|
|
|
|
- "as corrupt as it does not exists in " +
|
|
|
|
- "blocksMap");
|
|
|
|
|
|
+ "block " + storedBlock +
|
|
|
|
+ " could not be marked as corrupt as it" +
|
|
|
|
+ " does not belong to any file");
|
|
|
|
+ addToInvalidates(storedBlock, node);
|
|
|
|
+ return;
|
|
|
|
+ }
|
|
|
|
+ // Add this replica to corruptReplicas Map
|
|
|
|
+ corruptReplicas.addToCorruptReplicasMap(storedBlock, node);
|
|
|
|
+ if (countNodes(storedBlock).liveReplicas() > inode.getReplication()) {
|
|
|
|
+ // the block is over-replicated so invalidate the replicas immediately
|
|
|
|
+ invalidateBlock(storedBlock, node);
|
|
} else {
|
|
} else {
|
|
- INodeFile inode = storedBlockInfo.getINode();
|
|
|
|
- if (inode == null) {
|
|
|
|
- NameNode.stateChangeLog.info("BLOCK NameSystem.markBlockAsCorrupt: " +
|
|
|
|
- "block " + blk + " could not be marked " +
|
|
|
|
- "as corrupt as it does not belong to " +
|
|
|
|
- "any file");
|
|
|
|
- addToInvalidates(storedBlockInfo, node);
|
|
|
|
- return;
|
|
|
|
- }
|
|
|
|
- // Add this replica to corruptReplicas Map
|
|
|
|
- corruptReplicas.addToCorruptReplicasMap(storedBlockInfo, node);
|
|
|
|
- if (countNodes(storedBlockInfo).liveReplicas() > inode.getReplication()) {
|
|
|
|
- // the block is over-replicated so invalidate the replicas immediately
|
|
|
|
- invalidateBlock(storedBlockInfo, node);
|
|
|
|
- } else {
|
|
|
|
- // add the block to neededReplication
|
|
|
|
- updateNeededReplications(storedBlockInfo, -1, 0);
|
|
|
|
- }
|
|
|
|
|
|
+ // add the block to neededReplication
|
|
|
|
+ updateNeededReplications(storedBlock, -1, 0);
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
|
|
@@ -843,8 +849,9 @@ public class BlockManager {
|
|
* needed replications if this takes care of the problem.
|
|
* needed replications if this takes care of the problem.
|
|
* @return the block that is stored in blockMap.
|
|
* @return the block that is stored in blockMap.
|
|
*/
|
|
*/
|
|
- private Block addStoredBlock(Block block, DatanodeDescriptor node,
|
|
|
|
- DatanodeDescriptor delNodeHint) {
|
|
|
|
|
|
+ private Block addStoredBlock(final Block block,
|
|
|
|
+ DatanodeDescriptor node,
|
|
|
|
+ DatanodeDescriptor delNodeHint) {
|
|
BlockInfo storedBlock = blocksMap.getStoredBlock(block);
|
|
BlockInfo storedBlock = blocksMap.getStoredBlock(block);
|
|
if (storedBlock == null || storedBlock.getINode() == null) {
|
|
if (storedBlock == null || storedBlock.getINode() == null) {
|
|
// If this block does not belong to anyfile, then we are done.
|
|
// If this block does not belong to anyfile, then we are done.
|
|
@@ -857,30 +864,32 @@ public class BlockManager {
|
|
// it will happen in next block report otherwise.
|
|
// it will happen in next block report otherwise.
|
|
return block;
|
|
return block;
|
|
}
|
|
}
|
|
|
|
+ assert storedBlock != null : "Block must be stored by now";
|
|
|
|
+ INodeFile fileINode = storedBlock.getINode();
|
|
|
|
+ assert fileINode != null : "Block must belong to a file";
|
|
|
|
|
|
// add block to the data-node
|
|
// add block to the data-node
|
|
boolean added = node.addBlock(storedBlock);
|
|
boolean added = node.addBlock(storedBlock);
|
|
|
|
|
|
- assert storedBlock != null : "Block must be stored by now";
|
|
|
|
-
|
|
|
|
if (block != storedBlock) {
|
|
if (block != storedBlock) {
|
|
- if (block.getNumBytes() >= 0) {
|
|
|
|
- long cursize = storedBlock.getNumBytes();
|
|
|
|
|
|
+ long cursize = storedBlock.getNumBytes();
|
|
|
|
+ long newsize = block.getNumBytes();
|
|
|
|
+ if (newsize >= 0) {
|
|
if (cursize == 0) {
|
|
if (cursize == 0) {
|
|
- storedBlock.setNumBytes(block.getNumBytes());
|
|
|
|
- } else if (cursize != block.getNumBytes()) {
|
|
|
|
|
|
+ storedBlock.setNumBytes(newsize);
|
|
|
|
+ } else if (cursize != newsize) {
|
|
FSNamesystem.LOG.warn("Inconsistent size for block " + block +
|
|
FSNamesystem.LOG.warn("Inconsistent size for block " + block +
|
|
" reported from " + node.getName() +
|
|
" reported from " + node.getName() +
|
|
" current size is " + cursize +
|
|
" current size is " + cursize +
|
|
- " reported size is " + block.getNumBytes());
|
|
|
|
|
|
+ " reported size is " + newsize);
|
|
try {
|
|
try {
|
|
- if (cursize > block.getNumBytes()) {
|
|
|
|
|
|
+ if (cursize > newsize) {
|
|
// new replica is smaller in size than existing block.
|
|
// new replica is smaller in size than existing block.
|
|
// Mark the new replica as corrupt.
|
|
// Mark the new replica as corrupt.
|
|
FSNamesystem.LOG.warn("Mark new replica "
|
|
FSNamesystem.LOG.warn("Mark new replica "
|
|
+ block + " from " + node.getName() + " as corrupt "
|
|
+ block + " from " + node.getName() + " as corrupt "
|
|
+ "because length is shorter than existing ones");
|
|
+ "because length is shorter than existing ones");
|
|
- markBlockAsCorrupt(block, node);
|
|
|
|
|
|
+ markBlockAsCorrupt(storedBlock, node);
|
|
} else {
|
|
} else {
|
|
// new replica is larger in size than existing block.
|
|
// new replica is larger in size than existing block.
|
|
// Mark pre-existing replicas as corrupt.
|
|
// Mark pre-existing replicas as corrupt.
|
|
@@ -898,19 +907,12 @@ public class BlockManager {
|
|
FSNamesystem.LOG.warn("Mark existing replica "
|
|
FSNamesystem.LOG.warn("Mark existing replica "
|
|
+ block + " from " + node.getName() + " as corrupt "
|
|
+ block + " from " + node.getName() + " as corrupt "
|
|
+ "because its length is shorter than the new one");
|
|
+ "because its length is shorter than the new one");
|
|
- markBlockAsCorrupt(block, nodes[j]);
|
|
|
|
|
|
+ markBlockAsCorrupt(storedBlock, nodes[j]);
|
|
}
|
|
}
|
|
//
|
|
//
|
|
// change the size of block in blocksMap
|
|
// change the size of block in blocksMap
|
|
//
|
|
//
|
|
- storedBlock = blocksMap.getStoredBlock(block); // extra look up!
|
|
|
|
- if (storedBlock == null) {
|
|
|
|
- FSNamesystem.LOG.warn("Block " + block + " reported from "
|
|
|
|
- + node.getName()
|
|
|
|
- + " does not exist in blockMap. Surprise! Surprise!");
|
|
|
|
- } else {
|
|
|
|
- storedBlock.setNumBytes(block.getNumBytes());
|
|
|
|
- }
|
|
|
|
|
|
+ storedBlock.setNumBytes(newsize);
|
|
}
|
|
}
|
|
} catch (IOException e) {
|
|
} catch (IOException e) {
|
|
FSNamesystem.LOG.warn("Error in deleting bad block " + block + e);
|
|
FSNamesystem.LOG.warn("Error in deleting bad block " + block + e);
|
|
@@ -918,17 +920,15 @@ public class BlockManager {
|
|
}
|
|
}
|
|
|
|
|
|
// Updated space consumed if required.
|
|
// Updated space consumed if required.
|
|
- INodeFile file = (storedBlock != null) ? storedBlock.getINode() : null;
|
|
|
|
- long diff = (file == null) ? 0 :
|
|
|
|
- (file.getPreferredBlockSize() - storedBlock.getNumBytes());
|
|
|
|
|
|
+ long diff = fileINode.getPreferredBlockSize() - storedBlock.getNumBytes();
|
|
|
|
|
|
- if (diff > 0 && file.isUnderConstruction() &&
|
|
|
|
|
|
+ if (diff > 0 && fileINode.isUnderConstruction() &&
|
|
cursize < storedBlock.getNumBytes()) {
|
|
cursize < storedBlock.getNumBytes()) {
|
|
try {
|
|
try {
|
|
String path = /* For finding parents */
|
|
String path = /* For finding parents */
|
|
- namesystem.leaseManager.findPath((INodeFileUnderConstruction) file);
|
|
|
|
|
|
+ namesystem.leaseManager.findPath((INodeFileUnderConstruction)fileINode);
|
|
namesystem.dir.updateSpaceConsumed(path, 0, -diff
|
|
namesystem.dir.updateSpaceConsumed(path, 0, -diff
|
|
- * file.getReplication());
|
|
|
|
|
|
+ * fileINode.getReplication());
|
|
} catch (IOException e) {
|
|
} catch (IOException e) {
|
|
FSNamesystem.LOG
|
|
FSNamesystem.LOG
|
|
.warn("Unexpected exception while updating disk space : "
|
|
.warn("Unexpected exception while updating disk space : "
|
|
@@ -936,12 +936,9 @@ public class BlockManager {
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}
|
|
- block = storedBlock;
|
|
|
|
}
|
|
}
|
|
- assert storedBlock == block : "Block must be stored by now";
|
|
|
|
|
|
|
|
int curReplicaDelta = 0;
|
|
int curReplicaDelta = 0;
|
|
-
|
|
|
|
if (added) {
|
|
if (added) {
|
|
curReplicaDelta = 1;
|
|
curReplicaDelta = 1;
|
|
//
|
|
//
|
|
@@ -951,20 +948,20 @@ public class BlockManager {
|
|
//
|
|
//
|
|
if (!namesystem.isInSafeMode()) {
|
|
if (!namesystem.isInSafeMode()) {
|
|
NameNode.stateChangeLog.info("BLOCK* NameSystem.addStoredBlock: "
|
|
NameNode.stateChangeLog.info("BLOCK* NameSystem.addStoredBlock: "
|
|
- + "blockMap updated: " + node.getName() + " is added to " + block
|
|
|
|
- + " size " + block.getNumBytes());
|
|
|
|
|
|
+ + "blockMap updated: " + node.getName() + " is added to " +
|
|
|
|
+ storedBlock + " size " + storedBlock.getNumBytes());
|
|
}
|
|
}
|
|
} else {
|
|
} else {
|
|
NameNode.stateChangeLog.warn("BLOCK* NameSystem.addStoredBlock: "
|
|
NameNode.stateChangeLog.warn("BLOCK* NameSystem.addStoredBlock: "
|
|
- + "Redundant addStoredBlock request received for " + block + " on "
|
|
|
|
- + node.getName() + " size " + block.getNumBytes());
|
|
|
|
|
|
+ + "Redundant addStoredBlock request received for " + storedBlock
|
|
|
|
+ + " on " + node.getName() + " size " + storedBlock.getNumBytes());
|
|
}
|
|
}
|
|
|
|
|
|
// filter out containingNodes that are marked for decommission.
|
|
// filter out containingNodes that are marked for decommission.
|
|
NumberReplicas num = countNodes(storedBlock);
|
|
NumberReplicas num = countNodes(storedBlock);
|
|
int numLiveReplicas = num.liveReplicas();
|
|
int numLiveReplicas = num.liveReplicas();
|
|
int numCurrentReplica = numLiveReplicas
|
|
int numCurrentReplica = numLiveReplicas
|
|
- + pendingReplications.getNumReplicas(block);
|
|
|
|
|
|
+ + pendingReplications.getNumReplicas(storedBlock);
|
|
|
|
|
|
// check whether safe replication is reached for the block
|
|
// check whether safe replication is reached for the block
|
|
namesystem.incrementSafeBlockCount(numCurrentReplica);
|
|
namesystem.incrementSafeBlockCount(numCurrentReplica);
|
|
@@ -973,39 +970,37 @@ public class BlockManager {
|
|
// if file is being actively written to, then do not check
|
|
// if file is being actively written to, then do not check
|
|
// replication-factor here. It will be checked when the file is closed.
|
|
// replication-factor here. It will be checked when the file is closed.
|
|
//
|
|
//
|
|
- INodeFile fileINode = null;
|
|
|
|
- fileINode = storedBlock.getINode();
|
|
|
|
if (fileINode.isUnderConstruction()) {
|
|
if (fileINode.isUnderConstruction()) {
|
|
- return block;
|
|
|
|
|
|
+ return storedBlock;
|
|
}
|
|
}
|
|
|
|
|
|
// do not handle mis-replicated blocks during startup
|
|
// do not handle mis-replicated blocks during startup
|
|
if (namesystem.isInSafeMode())
|
|
if (namesystem.isInSafeMode())
|
|
- return block;
|
|
|
|
|
|
+ return storedBlock;
|
|
|
|
|
|
// handle underReplication/overReplication
|
|
// handle underReplication/overReplication
|
|
short fileReplication = fileINode.getReplication();
|
|
short fileReplication = fileINode.getReplication();
|
|
if (numCurrentReplica >= fileReplication) {
|
|
if (numCurrentReplica >= fileReplication) {
|
|
- neededReplications.remove(block, numCurrentReplica,
|
|
|
|
|
|
+ neededReplications.remove(storedBlock, numCurrentReplica,
|
|
num.decommissionedReplicas, fileReplication);
|
|
num.decommissionedReplicas, fileReplication);
|
|
} else {
|
|
} else {
|
|
- updateNeededReplications(block, curReplicaDelta, 0);
|
|
|
|
|
|
+ updateNeededReplications(storedBlock, curReplicaDelta, 0);
|
|
}
|
|
}
|
|
if (numCurrentReplica > fileReplication) {
|
|
if (numCurrentReplica > fileReplication) {
|
|
- processOverReplicatedBlock(block, fileReplication, node, delNodeHint);
|
|
|
|
|
|
+ processOverReplicatedBlock(storedBlock, fileReplication, node, delNodeHint);
|
|
}
|
|
}
|
|
// If the file replication has reached desired value
|
|
// If the file replication has reached desired value
|
|
// we can remove any corrupt replicas the block may have
|
|
// we can remove any corrupt replicas the block may have
|
|
- int corruptReplicasCount = corruptReplicas.numCorruptReplicas(block);
|
|
|
|
|
|
+ int corruptReplicasCount = corruptReplicas.numCorruptReplicas(storedBlock);
|
|
int numCorruptNodes = num.corruptReplicas();
|
|
int numCorruptNodes = num.corruptReplicas();
|
|
if (numCorruptNodes != corruptReplicasCount) {
|
|
if (numCorruptNodes != corruptReplicasCount) {
|
|
FSNamesystem.LOG.warn("Inconsistent number of corrupt replicas for " +
|
|
FSNamesystem.LOG.warn("Inconsistent number of corrupt replicas for " +
|
|
- block + "blockMap has " + numCorruptNodes +
|
|
|
|
|
|
+ storedBlock + "blockMap has " + numCorruptNodes +
|
|
" but corrupt replicas map has " + corruptReplicasCount);
|
|
" but corrupt replicas map has " + corruptReplicasCount);
|
|
}
|
|
}
|
|
if ((corruptReplicasCount > 0) && (numLiveReplicas >= fileReplication))
|
|
if ((corruptReplicasCount > 0) && (numLiveReplicas >= fileReplication))
|
|
- invalidateCorruptReplicas(block);
|
|
|
|
- return block;
|
|
|
|
|
|
+ invalidateCorruptReplicas(storedBlock);
|
|
|
|
+ return storedBlock;
|
|
}
|
|
}
|
|
|
|
|
|
/**
|
|
/**
|